Kafka Consumer API示例

既然翻到這裏,默認就認爲已經基本掌握了Kafka的基礎知識,本小結主要給出一次使用Kafka Consumer API的示例。咱們都知道Kafka API有舊版(0.8版以前)和新版(0.9版以後),這裏講的是新版,官網KafkaConsumer有更詳細介紹,可自行前往~html

1 環境配置

  • 操做系統: Ubuntu 16.04
  • kafka_2.11-0.10.2.2
  • JDK: 1.8.0_181
  • IntelliJ IDEA Maven
  • VNC

2 操做過程

pom.xml文件java

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kafka.test</groupId>
    <artifactId>test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>maven-kafka</name>
    <url>http://maven.apache.org</url>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.2</version>
        </dependency>
        <dependency>
            <groupId><org.apache.kafka/groupId>
            <artifactId>kafka_2.11</artifactId>
            <version><0.10.2.2/version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>

        <dependency>
            <groupId>org.xerial.snappy</groupId>
            <artifactId>snappy-java</artifactId>
            <version>1.1.2.6</version>
        </dependency>

    </dependencies>

</project>

kafka consumer消費數據,未SASL認證,這裏的代碼只能消費生產者正在推送的數據:apache

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;


public class KafkaConsumerTest {
    public static void main(String[] args) throws Exception {

        // Kafka consumer configuration settings
        String topicName = "XXXX";
        Properties props = new Properties();
        
        props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094");
        props.put("group.id", "XXXX");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("offsets.storage", "kafka");

        // 要發送自定義對象,須要指定對象的反序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        @SuppressWarnings("resource")
        final KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
        consumer.subscribe(Arrays.asList(topicName));

        while (true) {
            ConsumerRecords<String, Object> records = consumer.poll(100);
            for (ConsumerRecord<String, Object> record : records) {
                System.out.println(record.value());
            }
        }

    }
}

kafka consumer消費數據,SASL認證,能夠消費歷史數據:json

import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;


public class KafkaConsumerTest {
    public static void main(String[] args) throws Exception {

      
        String topicName = "XXXX";
        Properties props = new Properties();



        System.setProperty("java.security.auth.login.config", "/opt/kafka/kafka1/kafka_2.11-0.10.2.2/config/kafka_client_jaas.conf"); // 環境變量添加,須要輸入配置文件的路徑
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");

        props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094");
        props.put("group.id", "XXXX");        
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("offsets.storage", "kafka");
        props.put("max.poll.records",1000);

        // 要發送自定義對象,須要指定對象的反序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        @SuppressWarnings("resource")

        final KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
        consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                Map<TopicPartition,Long> beginningOffset = consumer.beginningOffsets(collection);

                //--from-beginning
                for(Map.Entry<TopicPartition,Long> entry : beginningOffset.entrySet()){
                    //seekToBeginning()
                    consumer.seekToBeginning(collection);
                }

            }
        });

        while (true) {
            ConsumerRecords<String, Object> records = consumer.poll(20000);
            for (ConsumerRecord<String, Object> record : records) {
                System.out.println(record.value());
            }
        }


    }
}

另外,選擇把kafka做爲消息的中間件,主要是拿到數據持久化到本地或者HDFS待分析挖掘出重要的信息,能夠使用Sparkstreaming存到HDFS,這裏給出從控制檯信息存到了本地磁盤。bootstrap

import java.io.*;


public class IO2File {
    public static void main(String[] args) throws IOException{
        File f = new File("out.json");
        f.createNewFile();
        FileOutputStream fileOutputStream = new FileOutputStream(f);
        PrintStream printStream = new PrintStream(fileOutputStream);
        System.setOut(printStream);
        System.out.println("xxxxxxx out.json");//  結合上面的代碼 直接把kafka消息的信息打印到控制檯 而後存到磁盤
    }
}

有時間再把Kafka基本原理 存儲 配置信息 SASL受權 Spark都總結出來。api

相關文章
相關標籤/搜索