總結:java
1.kafka 中能夠分步不一樣的組,消息能夠被不一樣組裏面的消費者屢次消費,也可使用queue的方式算法
2. 觀察zookeeper中kafka中的信息:api
[zk: air00:2181(CONNECTED) 8] ls /緩存
[consumers, config, controller, admin, brokers, zookeeper, controller_epoch]服務器
[zk: air00:2181(CONNECTED) 9] ls /consumers網絡
[test01, test02]session
[zk: air00:2181(CONNECTED) 10] ls /consumers/test01併發
[offsets, owners, ids]負載均衡
[zk: air00:2181(CONNECTED) 11] ls /consumers/test01/offsets異步
[test]
[zk: air00:2181(CONNECTED) 12] ls /consumers/test01/offsets/test
[1, 0]
[zk: air00:2181(CONNECTED) 13]
能夠看出消費者的信息存在於zookeeper中的節點裏面
3. 新來的消費者,不能獲取老的數據
4. 一個話題而言,不管有多少使用者訂閱了它,一條條消息都只會存儲一次
5. 線性讀取,使用磁盤緩存,消息默認保存一週
6. Linux中這是經過sendfile這個系統調用實現的。經過Java中的API,FileChannel.transferTo
7. Kafka對這種壓縮方式提供了支持。 一批消息能夠打包到一塊兒進行壓縮,而後以這種形式發送給服務器。這批消息都會被髮送給同一個消息使用者,並會在到達使用者那裏以前一直保持爲被壓縮的形式。
8.在Kafka中,咱們將該最高水位標記稱爲「偏移量」(offset)標記消息的狀態
9.在Kafka中,由使用者負責維護反映哪些消息已被使用的狀態信息(偏移量)(Zookeeper中)
10.pull 的處理方式
11. 每一個代理均可以在Zookeeper(分佈式協調系統)中註冊的一些元數據(例如,可用的主題)。生產者和消費者可使用Zookeeper發現主題和相互協調。
12.採用客戶端基於zookeeper的負載均衡能夠解決部分問題。若是這麼作就能讓生產者動態地發現新的代理,並按請求數量進行負載均衡。相似的,它還能讓生產者按照某些鍵值(key)對數據進行分區(partition)而不是隨機亂分,
生產者:
13.。在Kafka中,生產者有個選項(producer.type=async)可用指定使用異步分發出產請求(produce request)。這樣就容許用一個內存隊列(in-memory queue)把生產請求放入緩衝區,而後再以某個時間間隔或者事先配置好的批量大小將數據批量發送出去。由於通常來講數據會從一組以不一樣的數據速度生產數據的異構的機器中發佈出,因此對於代理而言,這種異步緩衝的方式有助於產生均勻一致的流量,於是會有更佳的網絡利用率和更高的吞吐量
14.kafak.producer.Partitioner接口,能夠對分區函數進行定製。在缺省狀況下使用的是隨即分區函數
15.提供基於zookeeper的代理自動發現功能 —— 經過使用zk.connect配置參數指定zookeeper的鏈接url,就可以使用基於zookeeper的代理髮現和負載均衡功能。在有些應用場合,可能不太適合於依賴zookeeper。在這種狀況下,生產者能夠從broker.list這個配置參數中得到一個代理的靜態列表,每一個生產請求會被隨即的分配給各代理分區。若是相應的代理宕機,那麼生產請求就會失敗。
16.使用者從新複雜均衡的算法可用讓小組內的全部使用者對哪一個使用者使用哪些分區達成一致意見。使用者從新負載均衡的動做每次添加或移除代理以及同一小組內的使用者時被觸發。對於一個給定的話題和一個給定的使用者小組,代理分區是在小組內的全部使用者中進行平均劃分的。一個分區老是由一個單個的使用者使用。這種設計方案簡化了實施過程。假設咱們運行多個使用者以併發的方式同時使用同一個分區,那麼在該分區上就會造成爭用(contention)的狀況,這樣一來就須要某種形式的鎖定機制。若是使用者的個數比分區多,就會出現有寫使用者根本得不到數據的狀況。在從新進行負載均衡的過程當中,咱們按照儘可能減小每一個使用者須要鏈接的代理的個數的方式,嚐嚐試着將分區分配給使用者。
package com.kafka.test;
import java.util.*;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
public class Producer01 {
public static void main(String[] args) {
String topic="test";
Properties props = new Properties(); //9092
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "air00:9092");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
producer.send(new KeyedMessage<String, String>(topic, "test" ));
producer.close();
}
}
消費者:
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class Consumer01 {
static String groupId="test01";
static String topic="test";
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect","air00:2181,air01:2181,air02:2181");
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
new ConsumerConfig(props));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext())
System.out.println(new String(it.next().message()));
}
}