Kafka集羣的安裝和使用

(有點像流水帳,好記性不如爛筆頭,權記於此以備忘)html

 

Kafka是一種高吞吐量的分佈式發佈訂閱的消息隊列系統,本來開發自LinkedIn,用做LinkedIn的活動流(ActivityStream)和運營數據處理管道(Pipeline)的基礎。如今它已被多家不一樣類型的公司做爲多種類型的數據管道和消息系統使用。java

1 Kafka消息隊列簡介

1.1 基本術語

  • Broker
    Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker[5]  
  • Topic
    每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic。(物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處)
  • Partition
    Partition是物理上的概念,每一個Topic包含一個或多個Partition.(通常爲kafka節點數cpu的總核數)
  • Producer
    負責發佈消息到Kafka broker
  • Consumer
    消息消費者,向Kafka broker讀取消息的客戶端。
  • Consumer Group
    每一個Consumer屬於一個特定的Consumer Group(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group)。

1.2 消息隊列

1.2.1 基本特性

  1. 可擴展
    • 在不須要下線的狀況下進行擴容
    • 數據流分區(partition)存儲在多個機器上
  2. 高性能
    • 單個broker就能服務上千客戶端
    • 單個broker每秒種讀/寫可達每秒幾百兆字節
    • 多個brokers組成的集羣將達到很是強的吞吐能力
    • 性能穩定,不管數據多大
    • Kafka在底層摒棄了Java堆緩存機制,採用了操做系統級別的頁緩存,同時將隨機寫操做改成順序寫,再結合Zero-Copy的特性極大地改善了IO性能
  3. 持久存儲
    • 存儲在磁盤上
    • 冗餘備份到其餘服務器上以防止丟失

1.2.2 消息格式

  1. 一個topic對應一種消息格式,所以消息用topic分類
  2. 一個topic表明的消息有1個或者多個patition(s)組成
  3. 一個partition中
    • 一個partition應該存放在一到多個server上
      • 若是隻有一個server,就沒有冗餘備份,是單機而不是集羣
      • 若是有多個server
        • 一個server爲leader,其餘servers爲followers;leader須要接受讀寫請求;followers僅做冗餘備份;leader出現故障,會自動選舉一個follower做爲leader,保證服務不中斷;每一個server均可能扮演一些partitions的leader和其它partitions的follower角色,這樣整個集羣就會達到負載均衡的效果
    • 消息按順序存放,順序不可變
    • 只能追加消息,不能插入
    • 每一個消息都有一個offset,用做消息ID, 在一個partition中惟一
    • offset有consumer保存和管理,所以讀取順序其實是徹底有consumer決定的,不必定是線性的
    • 消息有超時日期,過時則刪除

1.2.3 生產者 producer

  • producer將消息寫入kafka
  • 寫入要指定topic和partition
  • 消息如何分到不一樣的partition,算法由producer指定

1.2.4 消費者 consumer

  • consumer讀取消息並做處理
  • consumer group
    • 這個概念的引入爲了支持兩種場景:每條消息分發一個消費者,每條消息廣播給消費組的全部消費者
    • 多個consumer group訂閱一個topic,該topci的消息廣播給group內全部consumer
    • 一條消息發送到一個consumer group後,只能由該group的一個consumer接收和使用
    • 一個group中的每一個consumer對應一個partition能夠帶來以下好處
      • 能夠按照partition的數目進行併發處理
      • 每一個partition都只有一個consumer讀取,於是保證了消息被處理的順序是按照partition的存放順序進行,注意這個順序受到producer存放消息的算法影響
  •  一個Consumer能夠有多個線程進行消費,線程數應很少於topic的partition數,由於對於一個包含一或多消費線程的consumer group來講,一個partition只能分給其中的一個消費線程消費,且讓儘量多的線程能分配到partition(不過實際上真正去消費的線程及線程數仍是由線程池的調度機制來決定)。這樣若是線程數比partition數多,那麼單射分配也會有多出的線程,它們就不會消費到任何一個partition的數據而空轉耗資源 。
  • 若是consumer從多個partition讀到數據,不保證數據間的順序性,kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不一樣
  • 增減consumer,broker,partition會致使rebalance,因此rebalance後consumer對應的partition會發生變化

2. 安裝和使用

以kafka_2.11-0.10.0.0爲例。node

下載解壓後,進入kafka_2.11-0.10.0.0/git

2.1 啓動Zookeeper

測試時可使用Kafka附帶的Zookeeper:github

啓動: ./bin/zookeeper-server-start.sh config/zookeeper.properties & ,config/zookeeper.properties是Zookeeper的配置文件。web

結束: ./bin/zookeeper-server-stop.sh 算法

不過最好本身搭建一個Zookeeper集羣,提升可用性和可靠性。詳見:Zookeeper的安裝和使用——MarchOnexpress

2.2 啓動Kafka服務器

2.2.1 配置文件

配置config/server.properties文件,通常須要配置以下字段,其餘按默認便可:apache

broker.id:          每個broker在集羣中的惟一表示,要求是正數
listeners(效果同以前的版本的host.name及port):注意綁定host.name,不然可能出現莫名其妙的錯誤如consumer找不到broker。這個host.name是Kafka的server的機器名字,會註冊到Zookeeper中
log.dirs:           kafka數據的存放地址,多個地址的話用逗號分割,多個目錄分佈在不一樣磁盤上能夠提升讀寫性能
log.retention.hours:    數據文件保留多長時間, 存儲的最大時間超過這個時間會根據log.cleanup.policy設置數據清除策略
zookeeper.connect:     指定ZooKeeper的connect string,以hostname:port的形式,可有多個以逗號分隔,如hostname1:port1,hostname2:port2,hostname3:port3,還可有路徑,如:hostname1:port1,hostname2:port2,hostname3:port3/kafka,注意要事先在zk中建立/kafka節點,不然會報出錯誤:java.lang.IllegalArgumentException: Path length must be > 0

全部參數的含義及配置可參考:http://orchome.com/12http://blog.csdn.net/lizhitao/article/details/25667831api

 一個配置示例以下:

  1 # Licensed to the Apache Software Foundation (ASF) under one or more
  2 # contributor license agreements.  See the NOTICE file distributed with
  3 # this work for additional information regarding copyright ownership.
  4 # The ASF licenses this file to You under the Apache License, Version 2.0
  5 # (the "License"); you may not use this file except in compliance with
  6 # the License.  You may obtain a copy of the License at
  7 #
  8 #    http://www.apache.org/licenses/LICENSE-2.0
  9 #
 10 # Unless required by applicable law or agreed to in writing, software
 11 # distributed under the License is distributed on an "AS IS" BASIS,
 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13 # See the License for the specific language governing permissions and
 14 # limitations under the License.
 15 # see kafka.server.KafkaConfig for additional details and defaults
 16 
 17 ############################# Server Basics #############################
 18 
 19 # The id of the broker. This must be set to a unique integer for each broker.
 20 broker.id=1
 21 
 22 ############################# Socket Server Settings #############################
 23 
 24 # The address the socket server listens on. It will get the value returned from 
 25 # java.net.InetAddress.getCanonicalHostName() if not configured.
 26 #   FORMAT:
 27 #     listeners = security_protocol://host_name:port
 28 #   EXAMPLE:
 29 #     listeners = PLAINTEXT://your.host.name:9092
 30 listeners=PLAINTEXT://192.168.6.128:9092
 31 
 32 # Hostname and port the broker will advertise to producers and consumers. If not set, 
 33 # it uses the value for "listeners" if configured.  Otherwise, it will use the value
 34 # returned from java.net.InetAddress.getCanonicalHostName().
 35 #advertised.listeners=PLAINTEXT://your.host.name:9092
 36 
 37 # The number of threads handling network requests
 38 num.network.threads=3
 39 
 40 # The number of threads doing disk I/O
 41 num.io.threads=8
 42 
 43 # The send buffer (SO_SNDBUF) used by the socket server
 44 socket.send.buffer.bytes=102400
 45 
 46 # The receive buffer (SO_RCVBUF) used by the socket server
 47 socket.receive.buffer.bytes=102400
 48 
 49 # The maximum size of a request that the socket server will accept (protection against OOM)
 50 socket.request.max.bytes=104857600
 51 
 52 
 53 ############################# Log Basics #############################
 54 
 55 # A comma seperated list of directories under which to store log files
 56 log.dirs=/usr/local/kafka/kafka_2.11-0.10.0.0/kfk_data/
 57 
 58 # The default number of log partitions per topic. More partitions allow greater
 59 # parallelism for consumption, but this will also result in more files across
 60 # the brokers.
 61 num.partitions=2
 62 auto.create.topics.enable=false
 63 
 64 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
 65 # This value is recommended to be increased for installations with data dirs located in RAID array.
 66 num.recovery.threads.per.data.dir=1
 67 
 68 ############################# Log Flush Policy #############################
 69 
 70 # Messages are immediately written to the filesystem but by default we only fsync() to sync
 71 # the OS cache lazily. The following configurations control the flush of data to disk.
 72 # There are a few important trade-offs here:
 73 #    1. Durability: Unflushed data may be lost if you are not using replication.
 74 #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
 75 #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
 76 # The settings below allow one to configure the flush policy to flush data after a period of time or
 77 # every N messages (or both). This can be done globally and overridden on a per-topic basis.
 78 
 79 # The number of messages to accept before forcing a flush of data to disk
 80 #log.flush.interval.messages=10000
 81 
 82 # The maximum amount of time a message can sit in a log before we force a flush
 83 #log.flush.interval.ms=1000
 84 
 85 ############################# Log Retention Policy #############################
 86 
 87 # The following configurations control the disposal of log segments. The policy can
 88 # be set to delete segments after a period of time, or after a given size has accumulated.
 89 # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
 90 # from the end of the log.
 91 
 92 # The minimum age of a log file to be eligible for deletion
 93 log.retention.hours=4
 94 
 95 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
 96 # segments don't drop below log.retention.bytes.
 97 #log.retention.bytes=1073741824
 98 
 99 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
100 log.segment.bytes=1073741824
101 
102 # The interval at which log segments are checked to see if they can be deleted according
103 # to the retention policies
104 log.retention.check.interval.ms=300000
105 
106 ############################# Zookeeper #############################
107 
108 # Zookeeper connection string (see zookeeper docs for details).
109 # This is a comma separated host:port pairs, each corresponding to a zk
110 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
111 # You can also append an optional chroot string to the urls to specify the
112 # root directory for all kafka znodes.
113 zookeeper.connect=192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181
114 
115 # Timeout in ms for connecting to zookeeper
116 zookeeper.connection.timeout.ms=6000
View Code

注意auto.create.topics.enable字段,若爲true則若是producer寫入某個不存在的topic時會自動建立該topic,若爲false則須要事先建立不然會報錯:failed after 3 retries。

2.2.2 命令

啓動: bin/kafka-server-start.sh config/server.properties ,生產環境最好以守護程序啓動:nohup  &

結束: bin/kafka-server-stop.sh 

2.2.3 Kafka在Zookeeper中的存儲結構

若上述的zookeeper.connect的值沒有路徑,則爲根路徑,啓動Zookeeper和Kafka,命令行鏈接Zookeeper後,用 get / 命令可發現有 consumers、config、controller、admin、brokers、zookeeper、controller_epoch 這幾個目錄。

其結構以下:(具體可參考:apache kafka系列之在zookeeper中存儲結構

 

2.3 使用

kafka自己是和zookeeper相連的,而對應producer和consumer的狀態保存也都是經過zookeeper完成的。對Kafka的各類操做經過其所鏈接的Zookeeper完成。

2.3.1 命令行客戶端

建立topic: bin/kafka-topics.sh  --create  --zookeeper  localhost:2181  --replication-factor 1  --partitions  1  --topic test 

列出全部topic: bin/kafka-topics.sh --list --zookeeper localhost:2181 

查看topic信息(包括分區、副本狀況等): kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic ,會列出分區數、副本數、副本leader節點、副本節點、活着的副本節點

往某topic生產消息: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

從某topic消費消息: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning (默認用一個線程消費指定topic的全部分區的數據)

刪除某個Kafka groupid:鏈接Zookeeper後用rmr命令,如刪除名爲JSI的消費組: rmr /consumers/JSI 

查看消費進度

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group test-mirror-consumer-zsm --zkconnect ec2-12345.cn-north-1.compute.amazonaws.com.cn:2181/kafka/blink/0822 --topic GPS2
    各參數:
    --group指MirrorMaker消費源集羣時指定的group.id
    -zkconnect指源集羣的zookeeper地址
    --topic指定查的topic,沒指定則返回全部topic的消費狀況

2.3.2 Java客戶端

一、Topic操做:

 1 import kafka.admin.DeleteTopicCommand;
 2 import kafka.admin.TopicCommand;
 3 
 4 /**
 5  * @author zsm
 6  * @date 2016年9月27日 上午10:26:42
 7  * @version 1.0
 8  * @parameter
 9  * @since
10  * @return
11  */
12 public class JTopic {
13     public static void createTopic(String zkAddr, String topicName, int partition, int replication) {
14         String[] options = new String[] { "--create", "--zookeeper", zkAddr, "--topic", topicName, "--partitions",
15                 partition + "", "--replication-factor", replication + "" };
16         TopicCommand.main(options);
17     }
18 
19     public static void listTopic(String zkAddr) {
20         String[] options = new String[] { "--list", "--zookeeper", zkAddr };
21         TopicCommand.main(options);
22     }
23 
24     public static void describeTopic(String zkAddr, String topicName) {
25         String[] options = new String[] { "--describe", "--zookeeper", zkAddr, "--topic", topicName, };
26         TopicCommand.main(options);
27     }
28 
29     public static void alterTopic(String zkAddr, String topicName) {
30         String[] options = new String[] { "--alter", "--zookeeper", zkAddr, "--topic", topicName, "--partitions", "5" };
31         TopicCommand.main(options);
32     }
33 
34     // 經過刪除zk裏面對應的路徑來實現刪除topic的功能,只會刪除zk裏面的信息,Kafka上真實的數據並無刪除
35     public static void deleteTopic(String zkAddr, String topicName) {
36         String[] options = new String[] { "--zookeeper", zkAddr, "--topic", topicName };
37         DeleteTopicCommand.main(options);
38     }
39 
40     public static void main(String[] args) {
41         // TODO Auto-generated method stub
42 
43         String myTestTopic = "ZsmTestTopic";
44         int myPartition = 4;
45         int myreplication = 1;
46 
47         //createTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic, myPartition, myreplication);
48         // listTopic(ConfigureAPI.KafkaProperties.ZK);
49         describeTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic);
50         // alterTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic);
51         // deleteTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic);
52     }
53 
54 }
View Code

二、寫:(寫時能夠指定key以供Kafka根據key將數據寫入某個分區,若無指定,則幾乎就是隨機找一個分區發送無key的消息,而後把這個分區號加入到緩存中以備後面直接使用——固然,Kafka自己也會清空該緩存(默認每10分鐘或每次請求topic元數據時))

  1 package com.zsm.kfkdemo;
  2 
  3 import java.util.ArrayList;
  4 import java.util.List;
  5 import java.util.Properties;
  6 
  7 import com.zsm.kfkdemo.ConfigureAPI.KafkaProperties;
  8 
  9 import kafka.javaapi.producer.Producer;
 10 import kafka.producer.KeyedMessage;
 11 import kafka.producer.ProducerConfig;
 12 
 13 /**
 14  * 能夠指定規則(key和分區函數)以讓消息寫到特定分區:
 15  * <p>
 16  * 一、若發送的消息沒有指定key則Kafka會隨機選擇一個分區
 17  * </p>
 18  * <p>
 19  * 二、不然,若指定了分區函數(經過partitioner.class)則該函數以key爲參數肯定寫到哪一個分區
 20  * </p>
 21  * <p>
 22  * 三、不然,Kafka根據hash(key)%partitionNum肯定寫到哪一個分區
 23  * </p>
 24  * 
 25  * @author zsm
 26  * @date 2016年9月27日 上午10:26:42
 27  * @version 1.0
 28  * @parameter
 29  * @since
 30  * @return
 31  */
 32 public class JProducer extends Thread {
 33     private Producer<String, String> producer;
 34     private String topic;
 35     private final int SLEEP = 10;
 36     private final int msgNum = 1000;
 37 
 38     public JProducer(String topic) {
 39         Properties props = new Properties();
 40         props.put("metadata.broker.list", KafkaProperties.BROKER_LIST);// 如192.168.6.127:9092,192.168.6.128:9092
 41         // request.required.acks
 42         // 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees
 43         // (some data will be lost when a server fails).
 44         // 1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server
 45         // acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
 46         // -1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be
 47         // lost as long as at least one in sync replica remains.
 48         props.put("request.required.acks", "-1");
 49         // 配置value的序列化類
 50         props.put("serializer.class", "kafka.serializer.StringEncoder");
 51         // 配置key的序列化類
 52         props.put("key.serializer.class", "kafka.serializer.StringEncoder");
 53         // 提供自定義的分區函數將消息寫到分區上,未指定的話Kafka根據hash(messageKey)%partitionNum肯定寫到哪一個分區
 54         props.put("partitioner.class", "com.zsm.kfkdemo.MyPartitioner");
 55         producer = new Producer<String, String>(new ProducerConfig(props));
 56         this.topic = topic;
 57     }
 58 
 59     @Override
 60     public void run() {
 61         boolean isBatchWriteMode = true;
 62         System.out.println("isBatchWriteMode: " + isBatchWriteMode);
 63         if (isBatchWriteMode) {
 64             // 批量發送
 65             int batchSize = 100;
 66             List<KeyedMessage<String, String>> msgList = new ArrayList<KeyedMessage<String, String>>(batchSize);
 67             for (int i = 0; i < msgNum; i++) {
 68                 String msg = "Message_" + i;
 69                 msgList.add(new KeyedMessage<String, String>(topic, i + "", msg));
 70                 // msgList.add(new KeyedMessage<String, String>(topic, msg));//未指定key,Kafka會自動選擇一個分區
 71                 if (i % batchSize == 0) {
 72                     producer.send(msgList);
 73                     System.out.println("Send->[" + msgList + "]");
 74                     msgList.clear();
 75                     try {
 76                         sleep(SLEEP);
 77                     } catch (Exception ex) {
 78                         ex.printStackTrace();
 79                     }
 80                 }
 81             }
 82             producer.send(msgList);
 83         } else {
 84             // 單個發送
 85             for (int i = 0; i < msgNum; i++) {
 86                 KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, i + "", "Message_" + i);
 87                 // KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, "Message_" + i);//未指定key,Kafka會自動選擇一個分區
 88                 producer.send(msg);
 89                 System.out.println("Send->[" + msg + "]");
 90                 try {
 91                     sleep(SLEEP);
 92                 } catch (Exception ex) {
 93                     ex.printStackTrace();
 94                 }
 95             }
 96         }
 97 
 98         System.out.println("send done");
 99     }
100 
101     public static void main(String[] args) {
102         JProducer pro = new JProducer(KafkaProperties.TOPIC);
103         pro.start();
104     }
105 }
View Code

三、讀:(對於Consumer,須要注意 auto.commit.enable 和 auto.offset.reset 這兩個字段)

 1 package com.zsm.kfkdemo;
 2 
 3 import java.text.MessageFormat;
 4 import java.util.HashMap;
 5 import java.util.List;
 6 import java.util.Map;
 7 import java.util.Properties;
 8 
 9 import com.zsm.kfkdemo.ConfigureAPI.KafkaProperties;
10 
11 import kafka.consumer.Consumer;
12 import kafka.consumer.ConsumerConfig;
13 import kafka.consumer.ConsumerIterator;
14 import kafka.consumer.KafkaStream;
15 import kafka.javaapi.consumer.ConsumerConnector;
16 import kafka.message.MessageAndMetadata;
17 
18 /**
19  * 同一consumer group的多線程消費能夠兩種方法實現:
20  * <p>
21  * 一、實現單線程客戶端,啓動多個去消費
22  * </p>
23  * <p>
24  * 二、在客戶端的createMessageStreams裏爲topic指定大於1的線程數,再啓動多個線程處理每一個stream
25  * </p>
26  * 
27  * @author zsm
28  * @date 2016年9月27日 上午10:26:42
29  * @version 1.0
30  * @parameter
31  * @since
32  * @return
33  */
34 public class JConsumer extends Thread {
35 
36     private ConsumerConnector consumer;
37     private String topic;
38     private final int SLEEP = 20;
39 
40     public JConsumer(String topic) {
41         consumer = Consumer.createJavaConsumerConnector(this.consumerConfig());
42         this.topic = topic;
43     }
44 
45     private ConsumerConfig consumerConfig() {
46         Properties props = new Properties();
47         props.put("zookeeper.connect", KafkaProperties.ZK);
48         props.put("group.id", KafkaProperties.GROUP_ID);
49         props.put("auto.commit.enable", "true");// 默認爲true,讓consumer按期commit offset,zookeeper會將offset持久化,不然只在內存,若故障則再消費時會從最後一次保存的offset開始
50         props.put("auto.commit.interval.ms", KafkaProperties.INTERVAL + "");// 通過INTERVAL時間提交一次offset
51         props.put("auto.offset.reset", "largest");// What to do when there is no initial offset in ZooKeeper or if an offset is out of range
52         props.put("zookeeper.session.timeout.ms", KafkaProperties.TIMEOUT + "");
53         props.put("zookeeper.sync.time.ms", "200");
54         return new ConsumerConfig(props);
55     }
56 
57     @Override
58     public void run() {
59         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
60         topicCountMap.put(topic, new Integer(1));// 線程數
61         Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topicCountMap);
62         KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0);// 若上面設了多個線程去消費,則這裏需爲每一個stream開個線程作以下的處理
63 
64         ConsumerIterator<byte[], byte[]> it = stream.iterator();
65         MessageAndMetadata<byte[], byte[]> messageAndMetaData = null;
66         while (it.hasNext()) {
67             messageAndMetaData = it.next();
68             System.out.println(MessageFormat.format("Receive->[ message:{0} , key:{1} , partition:{2} , offset:{3} ]",
69                     new String(messageAndMetaData.message()), new String(messageAndMetaData.key()),
70                     messageAndMetaData.partition() + "", messageAndMetaData.offset() + ""));
71             try {
72                 sleep(SLEEP);
73             } catch (Exception ex) {
74                 ex.printStackTrace();
75             }
76         }
77     }
78 
79     public static void main(String[] args) {
80         JConsumer con = new JConsumer(KafkaProperties.TOPIC);
81         con.start();
82     }
83 }
View Code

與Kafka相關的Maven依賴:

 1         <dependency>
 2             <groupId>org.apache.kafka</groupId>
 3             <artifactId>kafka_2.9.2</artifactId>
 4             <version>0.8.1.1</version>
 5             <exclusions>
 6                 <exclusion>
 7                     <groupId>com.sun.jmx</groupId>
 8                     <artifactId>jmxri</artifactId>
 9                 </exclusion>
10                 <exclusion>
11                     <groupId>com.sun.jdmk</groupId>
12                     <artifactId>jmxtools</artifactId>
13                 </exclusion>
14                 <exclusion>
15                     <groupId>javax.jms</groupId>
16                     <artifactId>jms</artifactId>
17                 </exclusion>
18             </exclusions>
19         </dependency>
View Code

 

3 MirrorMaker

Kafka自身提供的MirrorMaker工具用於把一個集羣的數據同步到另外一集羣,其原理就是對源集羣消費、對目標集羣生產。

運行時須要指定源集羣的Zookeeper地址(pull模式)或目標集羣的Broker列表(push模式)。

3.1 使用

運行 ./kafka-run-class.sh kafka.tools.MirrorMaker --help 查看使用說明,以下:

 1 Option                                  Description                            
 2 ------                                  -----------                            
 3 --blacklist <Java regex (String)>       Blacklist of topics to mirror.         
 4 --consumer.config <config file>         Consumer config to consume from a      
 5                                           source cluster. You may specify      
 6                                           multiple of these.                   
 7 --help                                  Print this message.                    
 8 --num.producers <Integer: Number of     Number of producer instances (default: 
 9   producers>                              1)                                   
10 --num.streams <Integer: Number of       Number of consumption streams.         
11   threads>                                (default: 1)                         
12 --producer.config <config file>         Embedded producer config.              
13 --queue.size <Integer: Queue size in    Number of messages that are buffered   
14   terms of number of messages>            between the consumer and producer    
15                                           (default: 10000)                     
16 --whitelist <Java regex (String)>       Whitelist of topics to mirror.
View Code

3.2 啓動

./bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config  zsmSourceClusterConsumer.config  --num.streams 2 --producer.config zsmTargetClusterProducer.config --whitelist="ds*"
    --consumer.config所指定的文件裏至少須要有zookeeper.connect、group.id兩字段
    --producer.config至少須要有metadata.broker.list字段,指定目標集羣的brooker列表
    --whitelist指定要同步的topic

能夠用2.3.1所說的查看消費進度來查看對原集羣的同步情況(即消費情況)。

 

4 Kafka監控工具(KafkaOffsetMonitor)

能夠藉助KafkaOffsetMonitor來圖形化展現Kafka的broker節點、topic、consumer及offset等信息。

以KafkaOffsetMonitor-assembly-0.2.0.jar爲例,下載後執行:

#!/bin/bash
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk 192.168.5.131:2181,192.168.6.132:2181,192.168.6.133:2181 \ --port 8087 \ --refresh 10.seconds \ --retain 1.days 1>./zsm-logs/stdout.log 2>./zsm-logs/stderr.log &

其中,zk按照host1:port1,host2:port2…的格式去寫便可,port爲開啓web界面的端口號,refresh爲刷新時間,retain爲數據保留時間(單位seconds, minutes, hours, days)

 

5 Kafka集羣管理工具(Kafka Manager)

kafka-manager是yahoo開源出來的項目,屬於商業級別的工具用Scala編寫。

這個管理工具能夠很容易地發現分佈在集羣中的哪些topic分佈不均勻,或者是分區在整個集羣分佈不均勻的的狀況。它支持管理多個集羣、選擇副本、副本從新分配以及建立Topic。同時,這個管理工具也是一個很是好的能夠快速瀏覽這個集羣的工具。

此工具以集羣的方式運行,須要Zookeeper。

參考資料:http://hengyunabc.github.io/kafka-manager-install/

5.1 安裝

須要從Github下載源碼並安裝sbt工具編譯生成安裝包,生成的時間很長且不知爲什麼一直出錯,因此這裏用網友已編譯好的包 (備份連接)。

包爲kafka-manager-1.0-SNAPSHOT.zip

>解壓:

 unzip kafka-manager-1.0-SNAPSHOT.zip 

>配置conf/application.conf裏的kafka-manager.zkhosts:

 kafka-manager.zkhosts="192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181" 

>啓動:

 ./bin/kafka-manager -Dconfig.file=conf/application.conf (啓動後在Zookeeper根目錄下可發現增長了kafka-manager目錄)

默認是9000端口,要使用其餘端口能夠在命令行指定http.port,此外kafka-manager.zkhosts也能夠在命令行指定,如:

 ./bin/kafka-manager -Dhttp.port=9001 -Dkafka-manager.zkhosts="192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181" 

5.2 使用

訪問web頁面,在Cluster->Add Cluster,輸入要監控的Kafka集羣的Zookeeper便可。

6 進階

  • 在當前的kafka版本實現中,對於zookeeper的全部操做都是由kafka controller來完成的(serially的方式)
  • offset管理:kafka會記錄offset到zk中。可是,zk client api對zk的頻繁寫入是一個低效的操做。0.8.2 kafka引入了native offset storage,將offset管理從zk移出,而且能夠作到水平擴展。其原理就是利用了kafka的compacted topic,offset以consumer group,topic與partion的組合做爲key直接提交到compacted topic中。同時Kafka又在內存中維護了三元組來維護最新的offset信息,consumer來取最新offset信息時直接從內存拿便可。固然,kafka容許你快速checkpoint最新的offset信息到磁盤上。
  • 如何肯定分區數:分區數的肯定與硬件、軟件、負載狀況等都有關,要視具體狀況而定,不過依然能夠遵循必定的步驟來嘗試肯定分區數:建立一個只有1個分區的topic,而後測試這個topic的producer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位是MB/s。而後假設總的目標吞吐量是Tt,那麼分區數 =  Tt / max(Tp, Tc)

7 參考資料

一、http://www.cnblogs.com/fanweiwei/p/3689034.html(Kafka的使用)

二、http://orchome.com/12(Broker的配置)

三、http://blog.csdn.net/lizhitao/article/details/25667831(Broker的配置)

四、http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/(進階——Kafka深度解析)

五、http://www.cnblogs.com/huxi2b/p/4757098.html?utm_source=tuicool&utm_medium=referral(如何肯定分區數、key、consumer線程數)

相關文章
相關標籤/搜索