(有點像流水帳,好記性不如爛筆頭,權記於此以備忘)html
Kafka是一種高吞吐量的分佈式發佈訂閱的消息隊列系統,本來開發自LinkedIn,用做LinkedIn的活動流(ActivityStream)和運營數據處理管道(Pipeline)的基礎。如今它已被多家不一樣類型的公司做爲多種類型的數據管道和消息系統使用。java
以kafka_2.11-0.10.0.0爲例。node
下載解壓後,進入kafka_2.11-0.10.0.0/git
測試時可使用Kafka附帶的Zookeeper:github
啓動: ./bin/zookeeper-server-start.sh config/zookeeper.properties & ,config/zookeeper.properties是Zookeeper的配置文件。web
結束: ./bin/zookeeper-server-stop.sh 算法
不過最好本身搭建一個Zookeeper集羣,提升可用性和可靠性。詳見:Zookeeper的安裝和使用——MarchOnexpress
配置config/server.properties文件,通常須要配置以下字段,其餘按默認便可:apache
broker.id: 每個broker在集羣中的惟一表示,要求是正數 listeners(效果同以前的版本的host.name及port):注意綁定host.name,不然可能出現莫名其妙的錯誤如consumer找不到broker。這個host.name是Kafka的server的機器名字,會註冊到Zookeeper中 log.dirs: kafka數據的存放地址,多個地址的話用逗號分割,多個目錄分佈在不一樣磁盤上能夠提升讀寫性能 log.retention.hours: 數據文件保留多長時間, 存儲的最大時間超過這個時間會根據log.cleanup.policy設置數據清除策略 zookeeper.connect: 指定ZooKeeper的connect string,以hostname:port的形式,可有多個以逗號分隔,如hostname1:port1,hostname2:port2,hostname3:port3,還可有路徑,如:hostname1:port1,hostname2:port2,hostname3:port3/kafka,注意要事先在zk中建立/kafka節點,不然會報出錯誤:java.lang.IllegalArgumentException: Path length must be > 0
全部參數的含義及配置可參考:http://orchome.com/12、http://blog.csdn.net/lizhitao/article/details/25667831api
一個配置示例以下:
1 # Licensed to the Apache Software Foundation (ASF) under one or more 2 # contributor license agreements. See the NOTICE file distributed with 3 # this work for additional information regarding copyright ownership. 4 # The ASF licenses this file to You under the Apache License, Version 2.0 5 # (the "License"); you may not use this file except in compliance with 6 # the License. You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # see kafka.server.KafkaConfig for additional details and defaults 16 17 ############################# Server Basics ############################# 18 19 # The id of the broker. This must be set to a unique integer for each broker. 20 broker.id=1 21 22 ############################# Socket Server Settings ############################# 23 24 # The address the socket server listens on. It will get the value returned from 25 # java.net.InetAddress.getCanonicalHostName() if not configured. 26 # FORMAT: 27 # listeners = security_protocol://host_name:port 28 # EXAMPLE: 29 # listeners = PLAINTEXT://your.host.name:9092 30 listeners=PLAINTEXT://192.168.6.128:9092 31 32 # Hostname and port the broker will advertise to producers and consumers. If not set, 33 # it uses the value for "listeners" if configured. Otherwise, it will use the value 34 # returned from java.net.InetAddress.getCanonicalHostName(). 35 #advertised.listeners=PLAINTEXT://your.host.name:9092 36 37 # The number of threads handling network requests 38 num.network.threads=3 39 40 # The number of threads doing disk I/O 41 num.io.threads=8 42 43 # The send buffer (SO_SNDBUF) used by the socket server 44 socket.send.buffer.bytes=102400 45 46 # The receive buffer (SO_RCVBUF) used by the socket server 47 socket.receive.buffer.bytes=102400 48 49 # The maximum size of a request that the socket server will accept (protection against OOM) 50 socket.request.max.bytes=104857600 51 52 53 ############################# Log Basics ############################# 54 55 # A comma seperated list of directories under which to store log files 56 log.dirs=/usr/local/kafka/kafka_2.11-0.10.0.0/kfk_data/ 57 58 # The default number of log partitions per topic. More partitions allow greater 59 # parallelism for consumption, but this will also result in more files across 60 # the brokers. 61 num.partitions=2 62 auto.create.topics.enable=false 63 64 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. 65 # This value is recommended to be increased for installations with data dirs located in RAID array. 66 num.recovery.threads.per.data.dir=1 67 68 ############################# Log Flush Policy ############################# 69 70 # Messages are immediately written to the filesystem but by default we only fsync() to sync 71 # the OS cache lazily. The following configurations control the flush of data to disk. 72 # There are a few important trade-offs here: 73 # 1. Durability: Unflushed data may be lost if you are not using replication. 74 # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. 75 # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 76 # The settings below allow one to configure the flush policy to flush data after a period of time or 77 # every N messages (or both). This can be done globally and overridden on a per-topic basis. 78 79 # The number of messages to accept before forcing a flush of data to disk 80 #log.flush.interval.messages=10000 81 82 # The maximum amount of time a message can sit in a log before we force a flush 83 #log.flush.interval.ms=1000 84 85 ############################# Log Retention Policy ############################# 86 87 # The following configurations control the disposal of log segments. The policy can 88 # be set to delete segments after a period of time, or after a given size has accumulated. 89 # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens 90 # from the end of the log. 91 92 # The minimum age of a log file to be eligible for deletion 93 log.retention.hours=4 94 95 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining 96 # segments don't drop below log.retention.bytes. 97 #log.retention.bytes=1073741824 98 99 # The maximum size of a log segment file. When this size is reached a new log segment will be created. 100 log.segment.bytes=1073741824 101 102 # The interval at which log segments are checked to see if they can be deleted according 103 # to the retention policies 104 log.retention.check.interval.ms=300000 105 106 ############################# Zookeeper ############################# 107 108 # Zookeeper connection string (see zookeeper docs for details). 109 # This is a comma separated host:port pairs, each corresponding to a zk 110 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". 111 # You can also append an optional chroot string to the urls to specify the 112 # root directory for all kafka znodes. 113 zookeeper.connect=192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181 114 115 # Timeout in ms for connecting to zookeeper 116 zookeeper.connection.timeout.ms=6000
注意auto.create.topics.enable字段,若爲true則若是producer寫入某個不存在的topic時會自動建立該topic,若爲false則須要事先建立不然會報錯:failed after 3 retries。
啓動: bin/kafka-server-start.sh config/server.properties ,生產環境最好以守護程序啓動:nohup &
結束: bin/kafka-server-stop.sh
若上述的zookeeper.connect的值沒有路徑,則爲根路徑,啓動Zookeeper和Kafka,命令行鏈接Zookeeper後,用 get / 命令可發現有 consumers、config、controller、admin、brokers、zookeeper、controller_epoch 這幾個目錄。
其結構以下:(具體可參考:apache kafka系列之在zookeeper中存儲結構)
kafka自己是和zookeeper相連的,而對應producer和consumer的狀態保存也都是經過zookeeper完成的。對Kafka的各類操做經過其所鏈接的Zookeeper完成。
建立topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
列出全部topic: bin/kafka-topics.sh --list --zookeeper localhost:2181
查看topic信息(包括分區、副本狀況等): kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic ,會列出分區數、副本數、副本leader節點、副本節點、活着的副本節點
往某topic生產消息: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
從某topic消費消息: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning (默認用一個線程消費指定topic的全部分區的數據)
刪除某個Kafka groupid:鏈接Zookeeper後用rmr命令,如刪除名爲JSI的消費組: rmr /consumers/JSI
查看消費進度:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group test-mirror-consumer-zsm --zkconnect ec2-12345.cn-north-1.compute.amazonaws.com.cn:2181/kafka/blink/0822 --topic GPS2 各參數: --group指MirrorMaker消費源集羣時指定的group.id -zkconnect指源集羣的zookeeper地址 --topic指定查的topic,沒指定則返回全部topic的消費狀況
一、Topic操做:
1 import kafka.admin.DeleteTopicCommand; 2 import kafka.admin.TopicCommand; 3 4 /** 5 * @author zsm 6 * @date 2016年9月27日 上午10:26:42 7 * @version 1.0 8 * @parameter 9 * @since 10 * @return 11 */ 12 public class JTopic { 13 public static void createTopic(String zkAddr, String topicName, int partition, int replication) { 14 String[] options = new String[] { "--create", "--zookeeper", zkAddr, "--topic", topicName, "--partitions", 15 partition + "", "--replication-factor", replication + "" }; 16 TopicCommand.main(options); 17 } 18 19 public static void listTopic(String zkAddr) { 20 String[] options = new String[] { "--list", "--zookeeper", zkAddr }; 21 TopicCommand.main(options); 22 } 23 24 public static void describeTopic(String zkAddr, String topicName) { 25 String[] options = new String[] { "--describe", "--zookeeper", zkAddr, "--topic", topicName, }; 26 TopicCommand.main(options); 27 } 28 29 public static void alterTopic(String zkAddr, String topicName) { 30 String[] options = new String[] { "--alter", "--zookeeper", zkAddr, "--topic", topicName, "--partitions", "5" }; 31 TopicCommand.main(options); 32 } 33 34 // 經過刪除zk裏面對應的路徑來實現刪除topic的功能,只會刪除zk裏面的信息,Kafka上真實的數據並無刪除 35 public static void deleteTopic(String zkAddr, String topicName) { 36 String[] options = new String[] { "--zookeeper", zkAddr, "--topic", topicName }; 37 DeleteTopicCommand.main(options); 38 } 39 40 public static void main(String[] args) { 41 // TODO Auto-generated method stub 42 43 String myTestTopic = "ZsmTestTopic"; 44 int myPartition = 4; 45 int myreplication = 1; 46 47 //createTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic, myPartition, myreplication); 48 // listTopic(ConfigureAPI.KafkaProperties.ZK); 49 describeTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic); 50 // alterTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic); 51 // deleteTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic); 52 } 53 54 }
二、寫:(寫時能夠指定key以供Kafka根據key將數據寫入某個分區,若無指定,則幾乎就是隨機找一個分區發送無key的消息,而後把這個分區號加入到緩存中以備後面直接使用——固然,Kafka自己也會清空該緩存(默認每10分鐘或每次請求topic元數據時))
1 package com.zsm.kfkdemo; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.Properties; 6 7 import com.zsm.kfkdemo.ConfigureAPI.KafkaProperties; 8 9 import kafka.javaapi.producer.Producer; 10 import kafka.producer.KeyedMessage; 11 import kafka.producer.ProducerConfig; 12 13 /** 14 * 能夠指定規則(key和分區函數)以讓消息寫到特定分區: 15 * <p> 16 * 一、若發送的消息沒有指定key則Kafka會隨機選擇一個分區 17 * </p> 18 * <p> 19 * 二、不然,若指定了分區函數(經過partitioner.class)則該函數以key爲參數肯定寫到哪一個分區 20 * </p> 21 * <p> 22 * 三、不然,Kafka根據hash(key)%partitionNum肯定寫到哪一個分區 23 * </p> 24 * 25 * @author zsm 26 * @date 2016年9月27日 上午10:26:42 27 * @version 1.0 28 * @parameter 29 * @since 30 * @return 31 */ 32 public class JProducer extends Thread { 33 private Producer<String, String> producer; 34 private String topic; 35 private final int SLEEP = 10; 36 private final int msgNum = 1000; 37 38 public JProducer(String topic) { 39 Properties props = new Properties(); 40 props.put("metadata.broker.list", KafkaProperties.BROKER_LIST);// 如192.168.6.127:9092,192.168.6.128:9092 41 // request.required.acks 42 // 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees 43 // (some data will be lost when a server fails). 44 // 1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server 45 // acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost). 46 // -1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be 47 // lost as long as at least one in sync replica remains. 48 props.put("request.required.acks", "-1"); 49 // 配置value的序列化類 50 props.put("serializer.class", "kafka.serializer.StringEncoder"); 51 // 配置key的序列化類 52 props.put("key.serializer.class", "kafka.serializer.StringEncoder"); 53 // 提供自定義的分區函數將消息寫到分區上,未指定的話Kafka根據hash(messageKey)%partitionNum肯定寫到哪一個分區 54 props.put("partitioner.class", "com.zsm.kfkdemo.MyPartitioner"); 55 producer = new Producer<String, String>(new ProducerConfig(props)); 56 this.topic = topic; 57 } 58 59 @Override 60 public void run() { 61 boolean isBatchWriteMode = true; 62 System.out.println("isBatchWriteMode: " + isBatchWriteMode); 63 if (isBatchWriteMode) { 64 // 批量發送 65 int batchSize = 100; 66 List<KeyedMessage<String, String>> msgList = new ArrayList<KeyedMessage<String, String>>(batchSize); 67 for (int i = 0; i < msgNum; i++) { 68 String msg = "Message_" + i; 69 msgList.add(new KeyedMessage<String, String>(topic, i + "", msg)); 70 // msgList.add(new KeyedMessage<String, String>(topic, msg));//未指定key,Kafka會自動選擇一個分區 71 if (i % batchSize == 0) { 72 producer.send(msgList); 73 System.out.println("Send->[" + msgList + "]"); 74 msgList.clear(); 75 try { 76 sleep(SLEEP); 77 } catch (Exception ex) { 78 ex.printStackTrace(); 79 } 80 } 81 } 82 producer.send(msgList); 83 } else { 84 // 單個發送 85 for (int i = 0; i < msgNum; i++) { 86 KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, i + "", "Message_" + i); 87 // KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, "Message_" + i);//未指定key,Kafka會自動選擇一個分區 88 producer.send(msg); 89 System.out.println("Send->[" + msg + "]"); 90 try { 91 sleep(SLEEP); 92 } catch (Exception ex) { 93 ex.printStackTrace(); 94 } 95 } 96 } 97 98 System.out.println("send done"); 99 } 100 101 public static void main(String[] args) { 102 JProducer pro = new JProducer(KafkaProperties.TOPIC); 103 pro.start(); 104 } 105 }
三、讀:(對於Consumer,須要注意 auto.commit.enable 和 auto.offset.reset 這兩個字段)
1 package com.zsm.kfkdemo; 2 3 import java.text.MessageFormat; 4 import java.util.HashMap; 5 import java.util.List; 6 import java.util.Map; 7 import java.util.Properties; 8 9 import com.zsm.kfkdemo.ConfigureAPI.KafkaProperties; 10 11 import kafka.consumer.Consumer; 12 import kafka.consumer.ConsumerConfig; 13 import kafka.consumer.ConsumerIterator; 14 import kafka.consumer.KafkaStream; 15 import kafka.javaapi.consumer.ConsumerConnector; 16 import kafka.message.MessageAndMetadata; 17 18 /** 19 * 同一consumer group的多線程消費能夠兩種方法實現: 20 * <p> 21 * 一、實現單線程客戶端,啓動多個去消費 22 * </p> 23 * <p> 24 * 二、在客戶端的createMessageStreams裏爲topic指定大於1的線程數,再啓動多個線程處理每一個stream 25 * </p> 26 * 27 * @author zsm 28 * @date 2016年9月27日 上午10:26:42 29 * @version 1.0 30 * @parameter 31 * @since 32 * @return 33 */ 34 public class JConsumer extends Thread { 35 36 private ConsumerConnector consumer; 37 private String topic; 38 private final int SLEEP = 20; 39 40 public JConsumer(String topic) { 41 consumer = Consumer.createJavaConsumerConnector(this.consumerConfig()); 42 this.topic = topic; 43 } 44 45 private ConsumerConfig consumerConfig() { 46 Properties props = new Properties(); 47 props.put("zookeeper.connect", KafkaProperties.ZK); 48 props.put("group.id", KafkaProperties.GROUP_ID); 49 props.put("auto.commit.enable", "true");// 默認爲true,讓consumer按期commit offset,zookeeper會將offset持久化,不然只在內存,若故障則再消費時會從最後一次保存的offset開始 50 props.put("auto.commit.interval.ms", KafkaProperties.INTERVAL + "");// 通過INTERVAL時間提交一次offset 51 props.put("auto.offset.reset", "largest");// What to do when there is no initial offset in ZooKeeper or if an offset is out of range 52 props.put("zookeeper.session.timeout.ms", KafkaProperties.TIMEOUT + ""); 53 props.put("zookeeper.sync.time.ms", "200"); 54 return new ConsumerConfig(props); 55 } 56 57 @Override 58 public void run() { 59 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 60 topicCountMap.put(topic, new Integer(1));// 線程數 61 Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topicCountMap); 62 KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0);// 若上面設了多個線程去消費,則這裏需爲每一個stream開個線程作以下的處理 63 64 ConsumerIterator<byte[], byte[]> it = stream.iterator(); 65 MessageAndMetadata<byte[], byte[]> messageAndMetaData = null; 66 while (it.hasNext()) { 67 messageAndMetaData = it.next(); 68 System.out.println(MessageFormat.format("Receive->[ message:{0} , key:{1} , partition:{2} , offset:{3} ]", 69 new String(messageAndMetaData.message()), new String(messageAndMetaData.key()), 70 messageAndMetaData.partition() + "", messageAndMetaData.offset() + "")); 71 try { 72 sleep(SLEEP); 73 } catch (Exception ex) { 74 ex.printStackTrace(); 75 } 76 } 77 } 78 79 public static void main(String[] args) { 80 JConsumer con = new JConsumer(KafkaProperties.TOPIC); 81 con.start(); 82 } 83 }
與Kafka相關的Maven依賴:
1 <dependency> 2 <groupId>org.apache.kafka</groupId> 3 <artifactId>kafka_2.9.2</artifactId> 4 <version>0.8.1.1</version> 5 <exclusions> 6 <exclusion> 7 <groupId>com.sun.jmx</groupId> 8 <artifactId>jmxri</artifactId> 9 </exclusion> 10 <exclusion> 11 <groupId>com.sun.jdmk</groupId> 12 <artifactId>jmxtools</artifactId> 13 </exclusion> 14 <exclusion> 15 <groupId>javax.jms</groupId> 16 <artifactId>jms</artifactId> 17 </exclusion> 18 </exclusions> 19 </dependency>
Kafka自身提供的MirrorMaker工具用於把一個集羣的數據同步到另外一集羣,其原理就是對源集羣消費、對目標集羣生產。
運行時須要指定源集羣的Zookeeper地址(pull模式)或目標集羣的Broker列表(push模式)。
運行 ./kafka-run-class.sh kafka.tools.MirrorMaker --help 查看使用說明,以下:
1 Option Description 2 ------ ----------- 3 --blacklist <Java regex (String)> Blacklist of topics to mirror. 4 --consumer.config <config file> Consumer config to consume from a 5 source cluster. You may specify 6 multiple of these. 7 --help Print this message. 8 --num.producers <Integer: Number of Number of producer instances (default: 9 producers> 1) 10 --num.streams <Integer: Number of Number of consumption streams. 11 threads> (default: 1) 12 --producer.config <config file> Embedded producer config. 13 --queue.size <Integer: Queue size in Number of messages that are buffered 14 terms of number of messages> between the consumer and producer 15 (default: 10000) 16 --whitelist <Java regex (String)> Whitelist of topics to mirror.
./bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config zsmSourceClusterConsumer.config --num.streams 2 --producer.config zsmTargetClusterProducer.config --whitelist="ds*" --consumer.config所指定的文件裏至少須要有zookeeper.connect、group.id兩字段 --producer.config至少須要有metadata.broker.list字段,指定目標集羣的brooker列表 --whitelist指定要同步的topic
能夠用2.3.1所說的查看消費進度來查看對原集羣的同步情況(即消費情況)。
能夠藉助KafkaOffsetMonitor來圖形化展現Kafka的broker節點、topic、consumer及offset等信息。
以KafkaOffsetMonitor-assembly-0.2.0.jar爲例,下載後執行:
#!/bin/bash
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk 192.168.5.131:2181,192.168.6.132:2181,192.168.6.133:2181 \ --port 8087 \ --refresh 10.seconds \ --retain 1.days 1>./zsm-logs/stdout.log 2>./zsm-logs/stderr.log &
其中,zk按照host1:port1,host2:port2…的格式去寫便可,port爲開啓web界面的端口號,refresh爲刷新時間,retain爲數據保留時間(單位seconds, minutes, hours, days)
kafka-manager是yahoo開源出來的項目,屬於商業級別的工具用Scala編寫。
這個管理工具能夠很容易地發現分佈在集羣中的哪些topic分佈不均勻,或者是分區在整個集羣分佈不均勻的的狀況。它支持管理多個集羣、選擇副本、副本從新分配以及建立Topic。同時,這個管理工具也是一個很是好的能夠快速瀏覽這個集羣的工具。
此工具以集羣的方式運行,須要Zookeeper。
參考資料:http://hengyunabc.github.io/kafka-manager-install/
須要從Github下載源碼並安裝sbt工具編譯生成安裝包,生成的時間很長且不知爲什麼一直出錯,因此這裏用網友已編譯好的包 (備份連接)。
包爲kafka-manager-1.0-SNAPSHOT.zip
>解壓:
unzip kafka-manager-1.0-SNAPSHOT.zip
>配置conf/application.conf裏的kafka-manager.zkhosts:
kafka-manager.zkhosts="192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181"
>啓動:
./bin/kafka-manager -Dconfig.file=conf/application.conf (啓動後在Zookeeper根目錄下可發現增長了kafka-manager目錄)
默認是9000端口,要使用其餘端口能夠在命令行指定http.port,此外kafka-manager.zkhosts也能夠在命令行指定,如:
./bin/kafka-manager -Dhttp.port=9001 -Dkafka-manager.zkhosts="192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181"
訪問web頁面,在Cluster->Add Cluster,輸入要監控的Kafka集羣的Zookeeper便可。
一、http://www.cnblogs.com/fanweiwei/p/3689034.html(Kafka的使用)
二、http://orchome.com/12(Broker的配置)
三、http://blog.csdn.net/lizhitao/article/details/25667831(Broker的配置)
四、http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/(進階——Kafka深度解析)
五、http://www.cnblogs.com/huxi2b/p/4757098.html?utm_source=tuicool&utm_medium=referral(如何肯定分區數、key、consumer線程數)