Kafka初步學習

1、官網教程案例學習
 
Kafka — 分佈式消息隊列
 
消息系統
消息中間件:緩衝於生產與消費中間
緩衝滿了,能夠進行Kafka的擴容
 
特性:
水平擴展性、容錯性、實時、快
 
 
Kafka架構:
 
 
理解producer、consumer、broker(緩衝區)、topic(標籤)
 
 一個配置文件(server.properties)至關於一個broker
 
 
單節點(一臺機器)的Kafka部署方法:
 
開啓的時候記得建立多個控制檯,方便分別在上面同時啓動server(broker)、producer、consumer
 
1. 單broker部署:
 
準備工做:
先安裝zookeeper,解壓完後只須要更改conf目錄下的zoo.cfg,改變dataDir不保存在tmp目錄
ZK簡單的使用,bin目錄下的zkServer啓動服務器,而後經過zkCli來鏈接
 
配置Kafka:
config目錄下:
server.properties:
broker.id
listeners
host.name
 
啓動:在KAFKA_HOME下
先啓動ZK server
zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
再啓動kafka server,啓動時要加上config配置文件
kafka-server-start.sh $KAFKA_HOME/config/server.properties
 
建立topic:指定zookeeper端口
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic
kafka-topics.sh --list --zookeeper localhost:2181
查看topic詳細信息
describe命令,可查看活的broker有哪一個,leader是哪一個等
 
發送消息(生產):指定broker
kafka-console-producer.sh --broker-list localhost:9092 --topic test
 
注意:其中2181端口對應zookeeper server,而9092對應listener broker
 
消費消息:指定zk
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
 
注意:帶有beginning參數的話,會把歷史全部的都一塊兒讀取
 
 
2. 多broker部署:
 
複製多個server-properties
更改其中的broker.id  listeners   log.dir
 
啓動多個kafka server:
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties
 
-daemon在後臺運行
&表明還有下幾行
啓動成功後jps中有三個kafka
 
建立多副本topic:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-repli
 
發送和單broker同樣,只不過改爲多個端口
 
 多broker的容錯機制:
若是leader broker幹掉了,就會選舉新的,也就是幹掉任意哪一種broker都不會影響全局的使用
 
 
 
 
2、IDEA+Maven環境開發:
 
配置環境:
 
建立scala模版:
 
填信息:
 
修改setting路徑:
 
建立完成scala project
修改pom.xml文件:
添加與刪除dependency
kafka的版本:
<kafka.version>0.9.0.0</kafka.version>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>${kafka.version}</version>
  </dependency>
</dependencies>
 
 建立Java文件夾,並把它改爲source屬性(藍色),在IDEA右上角改
 
 
 3、用Java API來完成Kafka的Producer和Consumer的編程:
 
 
Producer:
 
首先定義Kafka中的經常使用變量類,brokerlist、ZK端口、topic名稱
/*
* Kafka配置文件, 用於定義producer, consumer
* */
public class KafkaProperties {
 
    //定義端口號
    public static final String ZK = "localhost:2181";
    public static final String TOPIC = "hello_topic";
    public static final String BROKER_LIST = "localhost:9092";
}

 

 
而後建立producer:
  1. 定義全局變量topic,producer(選擇kafka.javaapi.producer包)
  2. 寫構造函數,包括了:
  3. 外部傳入topic
  4. 建立producer,須要傳入ProducerConfig對象
  5. PC對象須要傳入一些參數,用properties類(java.util包)來傳入
  6. properties對象中須要爲PC對象設置」metadata.broker.list" 「serializer.class" "request.required.acks"
 
最後經過Thread線程run方法來啓動producer發送信息
(本測試實現的每隔2s發送一個message)
 
實現代碼:
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
import java.util.Properties;
 
/*
* Kafka生產者
* */
public class KafkaProducer extends Thread{
 
    private String topic;
    //選擇kafka.javaapi.producer
    private Producer<Integer, String> producer;
 
    //構造方法,傳入topic,生成producer
    public KafkaProducer(String topic) {
 
        this.topic = topic;
 
        //用properties設置ProducerConfig所須要的參數, 這是生成Producer的前提
        //分別是broker_list, 序列化, 握手機制
        Properties properties = new Properties();
        properties.put("metadata.broker.list", KafkaProperties.BROKER_LIST);
        properties.put("serializer.class", "kafka.serializer.StringEncoder");  //此處序列化類用String
        properties.put("request.required.acks", "1");  //可設置爲0, 1, -1, 通常生產用1, 最嚴謹是-1, 不能用0
 
        producer = new Producer<Integer, String>(new ProducerConfig(properties));
    }
 
    //用線程來啓動producer
    @Override
    public void run() {
 
        int messageNo = 1;
 
        while(true) {
            String message = "massage_" + messageNo;
            producer.send(new KeyedMessage<Integer, String>(topic, message));
            System.out.println("send: " + message);
 
            messageNo++;
 
            //2s間隔發送一次
            try {
                Thread.sleep(2000);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    }
}

 

 
 
Consumer:
 
建立過程:
  1. 構造方法中傳入topic
  2. 建立createConnector方法,返回值是一個ConsumerConnector,注意不直接是Consumer
  3. 按照producer同樣的方法,往ConsumerConnector中傳入所須要的屬性zookeeper.connect group.id
 
執行過程:經過Thread的run方法改寫:
  1. 爲了建立messageStream,先建立一個Map,裝topic和kafka stream的數量
  2. 建立messageStream,並獲取每次的數據
  3. 對messageStream進行迭代,獲取消息
 
實現代碼:
 
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
 
import static com.sun.org.apache.xml.internal.security.keys.keyresolver.KeyResolver.iterator;
import static javafx.scene.input.KeyCode.V;
 
/*
* Kafka消費者
* */
public class KafkaConsumer extends Thread {
 
    private String topic;
 
    public KafkaConsumer(String topic) {
 
        this.topic = topic;
    }
 
    //ConsumerConnector選擇kafka.javaapi.consumer包
    //此處是要建立consumer鏈接器, 而不是建立consumer, 區別於producer
    private ConsumerConnector createConnector() {
 
        //一樣地設置ConsumerConfig對象的屬性
        //須要設置ZK
        Properties properties = new Properties();
        properties.put("zookeeper.connect", KafkaProperties.ZK);
        properties.put("group.id", KafkaProperties.GROUP_ID);
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }
 
 
    //線程啓動consumer
    @Override
    public void run() {
 
        ConsumerConnector consumer = createConnector();
 
        //因爲createMessageStreams須要傳入一個Map, 因此建立一個
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        //map中放入topic和kafka stream的數量
        topicCountMap.put(topic, 1);
 
        //建立messageStream, 從源碼中可看出它的數據類型
        //String是topic, List是數據比特流
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);
        //獲取每次的數據
        KafkaStream<byte[], byte[]> byteStream = messageStream.get(topic).get(0);
 
        //數據流進行迭代
        ConsumerIterator<byte[], byte[]> iterator = byteStream.iterator();
 
        while (iterator.hasNext()) {
 
            //因爲iterator裏面的是byte類型,要轉爲String
            String message = new String(iterator.next().message());
            System.out.println("receive:" + message);
        }
    }
}

 

 
4、Kafka簡易實戰
 
整合Flume和Kafka完成實時數據採集
 
Kafka sink做爲producer鏈接起來
 
技術選型:
Agent1: exec source -> memory channel -> avro sink
Agent2: avro source -> memory channel -> kafka sink(producer)
producer -> consumer
 
 
配置exec-memory-avro:
 
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
 
# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /usr/local/mycode/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
 
# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444
 
# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
 
# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
 
 
 
配置avro-memory-kafka:
 
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel
 
# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = localhost
avro-memory-kafka.sources.avro-source.port = 44444
 
# Describe the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = localhost:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = hello_topic
avro-memory-kafka.sinks.kafka-sink.kafka.flumeBatchSize = 5
avro-memory-kafka.sinks.kafka-sink.kafka.kafka.producer.acks = 1
 
# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory
 
# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
 
 
 
啓動兩個flume agent:(注意前後順序)
 
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro-memory-kafka.conf --name avro-memory-kafka -Dflume.root.logger=INFO,console
 
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-avro.conf --name exec-memory-avro -Dflume.root.logger=INFO,console
 
 
 
啓動kafka consumer:
 
kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic
 
執行過程比較慢!要等一下 concumer的控制檯纔有數據顯示
相關文章
相關標籤/搜索