kafka入門

基本概念:html

一、什麼是kafka?
Kafka是一個高吞吐量、分佈式的發佈訂閱消息系統。據kafka官方網站介紹,當前的kafka已經定位爲一個分佈式流式處理平臺( a distributed streaming platform),它最初由LinkedIn公司開發,後來成爲Apache項目的一部分。
kafka核心模塊使用scala語言開發,支持多語言(如java、c/c++、python、go、erlang、node.js等)客戶端,它以能夠水平擴展和具備高吞吐量等特性而被普遍使用。java

二、流式處理平臺的3大關鍵特性。
a、可以容許發佈和訂閱流數據;
b、存儲流數據時提供相應的容錯機制;
c、當流數據到達時可以被及時處理。node

3.體系結構python

 

四、kafka的名詞
a、producer:生產者
b、consumer:消費者
c、topic:消息以topic爲類別記錄,Kafka將消息種子(Feed)分門別類,每一類的消息稱之爲一個主題(Topic)
d、broker:以集羣的方式運行,能夠由一個或多個服務組成,每一個服務叫作一個broker;消費者能夠訂閱一個或多個主題(topic),並從Broker拉數據,從而消費這些已發佈的消息。c++

e、分區和副本:kafka將一組消息概括爲一個主題,而每一個主題又被分爲一個或多個分區(partition)。每一個分區由一系列有序、不可變的消息組成,是一個有序隊列。
每一個分區在物理上對應爲一個文件夾,分區命名規則爲 主題名+「-」+分區編號,分區編號從0開始,到最大值減一。每一個分區又有一到多個副本(replica),分區的副本分佈在集羣的不一樣代理上,一提升可用性。

每一個消息(也叫做record記錄,也被稱爲消息)是由一個key,一個value和時間戳構成。apache

 

安裝部署:windows

一、JDK
a、最新的kafka要jdk1.7以上,官方推薦jdk1.8
b、環境變量,JAVA_HOME = D:\ProgramFiles\Java\jdk1.8.0_144
c、jdk安裝路徑不能有空格,不然kafka啓動報錯api

 

二、zookeeper
a、下載安裝包: http://zookeeper.apache.org/releases.html#download
下載後不須要安裝,直接解壓就好,個人jdk安裝在D盤,我把zookeeper也解壓在那裏。
b、環境變量,ZOOKEEPER_HOME = D:\ProgramFiles\zookeeper-3.3.6
記得添加到path系統變量下,%ZOOKEEPER_HOME%\bin;
c、修改配置文件
(1)進入目錄 D:\ProgramFiles\zookeeper-3.3.6\conf;
(2)將「zoo_sample.cfg」重命名爲「zoo.cfg」;
(3)文本編輯器打開zoo.cfg,找到並編輯 dataDir=D:\\ProgramFiles\\zookeeper-3.3.6\\tmp\\zookeeper_logs
d、在zoo.cfg文件中修改默認的Zookeeper端口(默認端口2181)服務器

e、啓動zookeeper,兩種方式
(1)打開新的cmd,輸入zkServer,運行Zookeeper;
(2)進入目錄D:\ProgramFiles\zookeeper-3.3.6\bin,執行zkServer.cmd腳本。session

三、kafka
a、下載安裝包:
http://kafka.apache.org/downloads.html
下載後不須要安裝,直接解壓就好,個人jdk安裝在D盤,我把kafka也解壓在那裏。

b、修改配置文件
(1)進入Kafka配置目錄: D:\ProgramFiles\kafka_2.12-1.1.0\config
(2)文本編輯器打開 server.properties 文件,找到並編輯日誌路徑
log.dirs=D:\\ProgramFiles\\kafka_2.12-1.1.0\\tmp\\kafka-logs
(3)找到並編輯zookeeper.connect=localhost:2181。表示本地運行
(4)Kafka會按照默認,在9092端口上運行,並鏈接zookeeper的默認端口:2181。

c、運行
重要:請確保在啓動Kafka服務器前,Zookeeper實例已經準備好並開始運行。
(1)進入kafka安裝目錄, D:\ProgramFiles\kafka_2.12-1.1.0
(2)按下Shift鍵,同時單擊鼠標右鍵,選擇「打開命令窗口」選項,打開命令行;
(3)輸入
.\bin\windows\kafka-server-start.bat .\config\server.properties
而後回車

上面的Zookeeper和kafka一直打開。
一、建立主題
a、進入kafka安裝目錄, D:\ProgramFiles\kafka_2.12-1.1.0
b、按下Shift鍵,同時單擊鼠標右鍵,選擇「打開命令窗口」選項,打開命令行;
c、輸入
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 –repartition-factor 1 --partitions 1 --topic gl_test_topic
d、已建立的topic,不能夠重複建立。

上面全部的窗口要一直打開。
二、建立生產者
a、進入kafka安裝目錄, D:\ProgramFiles\kafka_2.12-1.1.0
b、按下Shift鍵,同時單擊鼠標右鍵,選擇「打開命令窗口」選項,打開命令行;
c、輸入
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic gl_test_topic_00

三、建立消費者
a、進入kafka安裝目錄, D:\ProgramFiles\kafka_2.12-1.1.0
b、按下Shift鍵,同時單擊鼠標右鍵,選擇「打開命令窗口」選項,打開命令行;
c、輸入
.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic gl_test_topic_00

 

1、安裝kafka
1.安裝zookeeper
    相關配置
    log_Dir
2.安裝kafka
    相關配置
    log_Dir
3.啓動zookeeper服務
    ./bin/zookeeper-server-start.sh config/zookeeper.properties
4.開啓kafka服務
    .\bin\windows\kafka-server-start.bat .\config\server.properties 
    ./bin/kafka-server-start.sh config/server.properties
5.建立topic
    .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_flink
    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_flink
  查看topic:
    ./bin/kafka-topics.sh --list --zookeeper localhost:2181
建立生產者
    .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test_flink     
    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_flink 
建立消費者
    .\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test_flink     
    ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_flink --from-beginning

2、搭建一個多個broker的集羣
    首先爲每一個節點編寫配置文件
    1.    config/server-1.properties:
        broker.id=1
        port=9093
        log.dir=/tmp/kafka-logs-1
    2.    config/server-2.properties:
        broker.id=2
        port=9094
        log.dir=/tmp/kafka-logs-2
    3.啓動另外的節點
    ./bin/kafka-server-start.sh config/server-1.properties
    ./bin/kafka-server-start.sh config/server-2.properties
    4.建立一個擁有3個副本的topic
    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
    ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
    
3、搭建Kafka開發環境   
1.添加依賴    
    <dependency>
        <groupId> org.apache.kafka</groupId >
        <artifactId> kafka_2.10</artifactId >
        <version> 0.8.0</ version>
    </dependency>

    
2.  
public interface KafkaProperties {
    final static String ZKCONNECT = "10.22.10.139:2181";
    final static String GROUPID = "group1";
    final static String TOPIC = "topic1";
    final static String KAFKASERVERURL = "10.22.10.139";
    final static int KAFKASERVERPORT = 9092;
    final static int KAFKAPRODUCERBUFFERSIZE = 64 * 1024;
    final static int CONNECTIONTIMEOUT = 20000;
    final static int RECONNECTINTERVAL = 10000;
    final static String TOPIC2 = "topic2";
    final static String TOPIC3 = "topic3";
    final static String CLIENTID = "SimpleConsumerDemoClient";
}

3.生產者 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class KafkaProducer extends Thread {
    private final Producer<Integer, String> producer;
    private final String topic;
    private final Properties props = new Properties();

    public KafkaProducer(String topic) {
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "10.22.10.139:9092");
        producer = new Producer<Integer, String>(new ProducerConfig(props));
        this.topic = topic;
    }

    @Override
    public void run() {
        int messageNo = 1;
        while (true) {
            String messageStr = new String("Message_" + messageNo);
            System.out.println("Send:" + messageStr);
            producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
            messageNo++;
            try {
                sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}   
    
4.消費者  
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumer(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.ZKCONNECT);
        props.put("group.id", KafkaProperties.GROUPID);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("receive:" + new String(it.next().message()));
            try {
                sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
} 
View Code
相關文章
相關標籤/搜索