Kafka學習(一) 初識

一,簡介java

二,Kafka的角色 apache

三,Kafka的安裝編程

  3.1 文件下載和解壓bootstrap

  3.2 文件配置網絡

  3.3 服務啓動session

四,Kafka的經常使用命令分佈式

五,Kafka的JAVA編程oop

  5.1 Producer編程網站

  5.2 Consumer編程this

 

 

正文

一,簡介

  Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由ScalaJava編寫。Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。 這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。 對於像Hadoop同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是經過Hadoop的並行加載機制來統一線上和離線的消息處理,也是爲了經過集羣來提供實時的消息。

二,Kafka的角色  

  Broker : 安裝Kafka服務的那臺集羣就是一個broker(broker的id要全局惟一)
  Producer :消息的生產者,負責將數據寫入到broker中(push)
  Consumer:消息的消費者,負責從kafka中讀取數據(pull),老版本的消費者須要依賴zk,新版本的不須要
  Topic: 主題,至關因而數據的一個分類,不一樣topic存放不一樣的數據
  Consumer Group: 消費者組,一個topic能夠有多個消費者同時消費,多個消費者若是在一個消費者組中,那麼他們不能重複消費數據

三,Kafka的安裝

  3.1 文件下載和解壓

  我這裏的spark是2.3.3因此須要kafka0.10.2.0版本:點擊下載

  解壓到相應的文件夾:以下圖所示

  

  3.2 文件配置

  三個必要配置的地方:

broker.id=1  ===> 全局惟一,三臺都要配置我這裏分別是1,2,3
listeners=PLAINTEXT://hd1:9092   ===> 還有兩臺hd2,hd3
# 這個目錄本身建立,用來保存kafka的數據
log.dirs=/usr/local/hadoop/kafka/data  
zookeeper.connect=hd1:2181,hd2:2181,hd3:2181 ===> zookeeper的地址

  以下:

  

  3.3 服務啓動

./bin/kafka-server-start.sh -daemon /usr/local/hadoop/kafka/kafka_2.10-0.10.2.0/config/server.properties

四,Kafka的經常使用命令

# 啓動
./bin/kafka-server-start.sh -daemon /usr/local/hadoop/kafka/kafka_2.10-0.10.2.0/config/server.properties

# 查看有那些topic
./bin/kafka-topics.sh --list --zookeeper hd1:2181,hd2:2181,hd3:2181

# 建立topic
./bin/kafka-topics.sh --create --zookeeper hd1:2181,hd2:2181,hd3:2181 --replication-factor 3 --partitions 3 --topic test

# 生產者數據
./bin/kafka-console-producer.sh --broker-list hd1:9092,hd2:9092,hd3:9092 --topic test

# 消費者消費數據
./bin/kafka-console-consumer.sh --zookeeper hd1:2181,hd2:2181,hd3:2181 --topic test --from-beginning

五,Kafka的JAVA編程

  5.1 Producer編程

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProduceDemo {

    public static void main(String[] args){
        Properties props = new Properties();//配置項
        props.put("bootstrap.servers", "hd1:9092,hd2:9092,hd3:9092");//使用新的API指定kafka集羣位置
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        String messageStr = null;
        for (int i = 1;i<1000;i++){
            messageStr = "hello, this is "+i+"th message";
            producer.send(new ProducerRecord<String, String>("test","Message",messageStr));
        }
        producer.close();
    }
}

  5.2 Consumer編程

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo implements Runnable{
    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private final String topic;
    private static final String GROUDID = "groupA";

    public ConsumerDemo(String topicName){
        Properties props = new Properties();
        props.put("bootstrap.servers", "hd1:9092,hd2:9092,hd3:9092");
        props.put("group.id", GROUDID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        this.consumer.subscribe(Arrays.asList(topic));
    }

    public void run(){
        int messageNum = 1;
        try{
            for (;;){
                msgList = consumer.poll(500);
                if (msgList!=null && msgList.count()>0){
                    for (ConsumerRecord<String, String> record : msgList){
                        if (messageNum % 50 ==0){
                            System.out.println(messageNum+"=receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                        }
                        if (messageNum % 1000 == 0)
                            break;
                        messageNum++;
                    }
                }
                else{
                    Thread.sleep(1000);
                }
            }
        }
        catch (InterruptedException e){
            e.printStackTrace();
        }
        finally{
            consumer.close();
        }
    }

    public static void main(String[] args){
        ConsumerDemo demo = new ConsumerDemo("test");
        Thread thread = new Thread(demo);
        thread.start();
    }
}
相關文章
相關標籤/搜索