簡介html
Apache Kafka是分佈式發佈-訂閱消息系統。它最初由LinkedIn公司開發,以後成爲Apache項目的一部分。Kafka是一種快速、可擴展的、設計內在就是分佈式的,分區的和可複製的提交日誌服務。java
Kafka架構算法
它的架構包括如下組件:apache
話題(Topic):是特定類型的消息流。消息是字節的有效負載(Payload),話題是消息的分類名或種子(Feed)名。api
生產者(Producer):是可以發佈消息到話題的任何對象。服務器
服務代理(Broker):已發佈的消息保存在一組服務器中,它們被稱爲代理(Broker)或Kafka集羣。網絡
消費者(Consumer):能夠訂閱一個或多個話題,並從Broker拉數據,從而消費這些已發佈的消息。session
Kafka存儲策略架構
1)kafka以topic來進行消息管理,每一個topic包含多個partition,每一個partition對應一個邏輯log,有多個segment組成。app
2)每一個segment中存儲多條消息(見下圖),消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
3)每一個part在內存中對應一個index,記錄每一個segment中的第一條消息偏移。
4)發佈者發到某個topic的消息會被均勻的分佈到多個partition上(或根據用戶指定的路由規則進行分佈),broker收到發佈消息往對應partition的最後一個segment上添加該消息,當某個segment上的消息條數達到配置值或消息發佈時間超過閾值時,segment上的消息會被flush到磁盤,只有flush到磁盤上的消息訂閱者才能訂閱到,segment達到必定的大小後將不會再往該segment寫數據,broker會建立新的segment。
Kafka數據保留策略
1)N天前的刪除。
2)保留最近的多少Size數據。
Kafka broker
與其它消息系統不一樣,Kafka broker是無狀態的。這意味着消費者必須維護已消費的狀態信息。這些信息由消費者本身維護,broker徹底無論(有offset managerbroker管理)。
如下摘抄自kafka官方文檔:
Kafka Design
目標
1) 高吞吐量來支持高容量的事件流處理
2) 支持從離線系統加載數據
3) 低延遲的消息系統
持久化
1) 依賴文件系統,持久化到本地
2) 數據持久化到log
效率
1) 解決」small IO problem「:
使用」message set「組合消息。
server使用」chunks of messages「寫到log。
consumer一次獲取大的消息塊。
2)解決」byte copying「:
在producer、broker和consumer之間使用統一的binary message format。
使用系統的page cache。
使用sendfile傳輸log,避免拷貝。
端到端的批量壓縮(End-to-end Batch Compression)
Kafka支持GZIP和Snappy壓縮協議。
複製(Replication)
1)一個partition的複製個數(replication factor)包括這個partition的leader自己。
2)全部對partition的讀和寫都經過leader。
3)Followers經過pull獲取leader上log(message和offset)
4)若是一個follower掛掉、卡住或者同步太慢,leader會把這個follower從」in sync replicas「(ISR)列表中刪除。
5)當全部的」in sync replicas「的follower把一個消息寫入到本身的log中時,這個消息才被認爲是」committed「的。
6)若是針對某個partition的全部複製節點都掛了,Kafka默認選擇最早復活的那個節點做爲leader(這個節點不必定在ISR裏)。
Kafka在Zookeeper中爲每個partition動態的維護了一個ISR,這個ISR裏的全部replica都跟上了leader,只有ISR裏的成員纔能有被選爲leader的可能(unclean.leader.election.enable=false)。
在這種模式下,對於f+1個副本,一個Kafka topic能在保證不丟失已經commit消息的前提下容忍f個副本的失敗,在大多數使用場景下,這種模式是十分有利的。事實上,爲了容忍f個副本的失敗,「少數服從多數」的方式和ISR在commit前須要等待的副本的數量是同樣的,可是ISR須要的總的副本的個數幾乎是「少數服從多數」的方式的一半。
The Producer
發送確認
經過request.required.acks來設置,選擇是否等待消息commit(是否等待全部的」in sync replicas「都成功複製了數據)
Producer能夠經過acks
參數指定最少須要多少個Replica確認收到該消息才視爲該消息發送成功。acks
的默認值是1,即Leader收到該消息後當即告訴Producer收到該消息,此時若是在ISR中的消息複製完該消息前Leader宕機,那該條消息會丟失。
推薦的作法是,將acks
設置爲all
或者-1
,此時只有ISR中的全部Replica都收到該數據(也即該消息被Commit),Leader纔會告訴Producer該消息發送成功,從而保證不會有未知的數據丟失。
負載均衡
1)producer能夠自定義發送到哪一個partition的路由規則。默認路由規則:hash(key)%numPartitions,若是key爲null則隨機選擇一個partition。
2)自定義路由:若是key是一個user id,能夠把同一個user的消息發送到同一個partition,這時consumer就能夠從同一個partition讀取同一個user的消息。
異步批量發送
批量發送:配置很少於固定消息數目一塊兒發送而且等待時間小於一個固定延遲的數據。
The Consumer
consumer控制消息的讀取。
Push vs Pull
1) producer push data to broker,consumer pull data from broker
2) consumer pull的優勢:consumer本身控制消息的讀取速度和數量。
3) consumer pull的缺點:若是broker沒有數據,則可能要pull屢次忙等待,Kafka能夠配置consumer long pull一直等到有數據。
Consumer Position
1) 大部分消息系統由broker記錄哪些消息被消費了,但Kafka不是。
2) Kafka由consumer控制消息的消費,consumer甚至能夠回到一個old offset的位置再次消費消息。
Consumer group
每個consumer實例都屬於一個consumer group。
每一條消息只會被同一個consumer group裏的一個consumer實例消費。
不一樣consumer group能夠同時消費同一條消息。
Consumer Rebalance
Kafka consumer high level API:
Message Delivery Semantics
三種:
At most once—Messages may be lost but are never redelivered.
At least once—Messages are never lost but may be redelivered.
Exactly once—this is what people actually want, each message is delivered once and only once.
Producer:有個」acks「配置能夠控制接收的leader的在什麼狀況下就回應producer消息寫入成功。
Consumer:
* 讀取消息,寫log,處理消息。若是處理消息失敗,log已經寫入,則沒法再次處理失敗的消息,對應」At most once「。
* 讀取消息,處理消息,寫log。若是消息處理成功,寫log失敗,則消息會被處理兩次,對應」At least once「。
* 讀取消息,同時處理消息並把result和log同時寫入。這樣保證result和log同時更新或同時失敗,對應」Exactly once「。
Kafka默認保證at-least-once delivery,允許用戶實現at-most-once語義,exactly-once的實現取決於目的存儲系統,kafka提供了讀取offset,實現也沒有問題。
Distribution
Consumer Offset Tracking
1)High-level consumer記錄每一個partition所消費的maximum offset,並按期commit到offset manager(broker)。
2)Simple consumer須要手動管理offset。如今的Simple consumer Java API只支持commit offset到zookeeper。
Consumers and Consumer Groups
1)consumer註冊到zookeeper
2)屬於同一個group的consumer(group id同樣)平均分配partition,每一個partition只會被一個consumer消費。
3)當broker或同一個group的其餘consumer的狀態發生變化的時候,consumer rebalance就會發生。
Zookeeper協調控制
1)管理broker與consumer的動態加入與離開。
2)觸發負載均衡,當broker或consumer加入或離開時會觸發負載均衡算法,使得一個consumer group內的多個consumer的訂閱負載平衡。
3)維護消費關係及每一個partition的消費信息。
日誌壓縮(Log Compaction)
1)針對一個topic的partition,壓縮使得Kafka至少知道每一個key對應的最後一個值。
2)壓縮不會重排序消息。
3)消息的offset是不會變的。
4)消息的offset是順序的。
5)壓縮發送和接收能下降網絡負載。
6)以壓縮後的形式持久化到磁盤。
生產者代碼示例:
import java.util.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { long events = Long.parseLong(args[0]); Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list", "broker1:9092,broker2:9092 "); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "example.producer.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = 「192.168.2.」 + rnd.nextInt(255); String msg = runtime + 「,www.example.com,」 + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); producer.send(data); } producer.close(); } }
Partitioning Code:
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } public int partition(Object key, int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions; } return partition; } }
消費者代碼示例:
import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } }
import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); System.out.println("Shutting down Thread: " + m_threadNumber); } }
關於Consumer的一個細節說明:
topicCountMap.put(topic, new Integer(a_numThreads));
這裏,若是提供的thread數目(a_numThreads)大於這個topic的partition的數目,有些thread會永遠讀不到消息。
若是若是提供的thread數目(a_numThreads)小於這個topic的partition的數目,有些thread會從多個partition讀到消息。
若是一個線程從多個partition讀取消息,沒法保證的消息的順序,只能保證從同一個partition讀取到的消息是順序的。
增長更多的進程/線程消費消息,會致使Kafka re-balance,可能會改變Partition和消費Thread的對應關係。
開發環境搭建:
https://cwiki.apache.org/confluence/display/KAFKA/Developer+Setup
一些example:
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
本身管理offset:
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
參考:
Kafka深度解析:http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/
https://kafka.apache.org/documentation.html
https://cwiki.apache.org/confluence/display/KAFKA/Index