本文轉載自http://www.cnblogs.com/cyfonly/p/5954614.html html
1.解耦:
容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。 2.冗餘: 消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。 3.擴展性: 由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的,只要另外增長處理過程便可。 4.靈活性 & 峯值處理能力: 在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見。若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。 5.可恢復性: 系統的一部分組件失效時,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。 6.順序保證: 在大多使用場景下,數據處理的順序都很重要。大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。(Kafka 保證一個 Partition 內的消息的有序性) 7.緩衝: 有助於控制和優化數據流通過系統的速度,解決生產消息和消費消息的處理速度不一致的狀況。 8.異步通訊: 不少時候,用戶不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。
以下圖:java
圖.1算法
如圖.1中,kafka 相關名詞解釋以下:apache
1.producer: 消息生產者,發佈消息到 kafka 集羣的終端或服務。 2.broker: kafka 集羣中包含的服務器。 3.topic: 每條發佈到 kafka 集羣的消息屬於的類別,即 kafka 是面向 topic 的。 4.partition: partition 是物理上的概念,每一個 topic 包含一個或多個 partition。kafka 分配的單位是 partition。 5.consumer: 從 kafka 集羣中消費消息的終端或服務。 6.Consumer group: high-level consumer API 中,每一個 consumer 都屬於一個 consumer group,每條消息只能被 consumer group 中的一個 Consumer 消費,但能夠被多個 consumer group 消費。 7.replica: partition 的副本,保障 partition 的高可用。 8.leader: replica 中的一個角色, producer 和 consumer 只跟 leader 交互。 9.follower: replica 中的一個角色,從 leader 中複製數據。 10.controller: kafka 集羣中的其中一個服務器,用來進行 leader election 以及 各類 failover。 12.zookeeper: kafka 經過 zookeeper 來存儲集羣的 meta 信息。
kafka 在 zookeeper 中的存儲結構以下圖所示:安全
圖.2服務器
producer 採用 push 模式將消息發佈到 broker,每條消息都被 append 到 patition 中,屬於順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障 kafka 吞吐率)。網絡
producer 發送消息到 broker 時,會根據分區算法選擇將其存儲到哪個 partition。其路由機制爲:多線程
1. 指定了 patition,則直接使用; 2. 未指定 patition 但指定 key,經過對 key 的 value 進行hash 選出一個 patition 3. patition 和 key 都未指定,使用輪詢選出一個 patition。
附上 java 客戶端分區源碼,一目瞭然:架構
//建立消息實例 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null"); if (timestamp != null && timestamp < 0) throw new IllegalArgumentException("Invalid timestamp " + timestamp); this.topic = topic; this.partition = partition; this.key = key; this.value = value; this.timestamp = timestamp; } //計算 patition,若是指定了 patition 則直接使用,不然使用 key 計算 private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); if (partition != null) { List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic()); int lastPartition = partitions.size() - 1; if (partition < 0 || partition > lastPartition) { throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition)); } return partition; } return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); } // 使用 key 選取 patition public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { return DefaultPartitioner.toPositive(nextValue) % numPartitions; } } else { //對 keyBytes 進行 hash 選出一個 patition return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
producer 寫入消息序列圖以下所示:app
圖.3
流程說明:
1. producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader 2. producer 將消息發送給該 leader 3. leader 將消息寫入本地 log 4. followers 從 leader pull 消息,寫入本地 log 後 leader 發送 ACK 5. leader 收到全部 ISR 中的 replica 的 ACK 後,增長 HW(high watermark,最後 commit 的 offset) 並向 producer 發送 ACK
通常狀況下存在三種狀況:
1. At most once 消息可能會丟,但毫不會重複傳輸 2. At least one 消息毫不會丟,但可能會重複傳輸 3. Exactly once 每條消息確定會被傳輸一次且僅傳輸一次
當 producer 向 broker 發送消息時,一旦這條消息被 commit,因爲 replication 的存在,它就不會丟。可是若是 producer 發送數據給 broker 後,遇到網絡問題而形成通訊中斷,那 Producer 就沒法判斷該條消息是否已經 commit。雖然 Kafka 沒法肯定網絡故障期間發生了什麼,可是 producer 能夠生成一種相似於主鍵的東西,發生故障時冪等性的重試屢次,這樣就作到了 Exactly once,但目前還並未實現。因此目前默認狀況下一條消息從 producer 到 broker 是確保了 At least once,可經過設置 producer 異步發送實現At most once。
物理上把 topic 分紅一個或多個 patition(對應 server.properties 中的 num.partitions=3 配置),每一個 patition 物理上對應一個文件夾(該文件夾存儲該 patition 的全部消息和索引文件),以下:
圖.4
不管消息是否被消費,kafka 都會保留全部消息。有兩種策略能夠刪除舊數據:
1. 基於時間:log.retention.hours=168 2. 基於大小:log.retention.bytes=1073741824
須要注意的是,由於Kafka讀取特定消息的時間複雜度爲O(1),即與文件大小無關,因此這裏刪除過時文件與提升 Kafka 性能無關。
建立 topic 的序列圖以下所示:
圖.5
流程說明:
1. controller 在 ZooKeeper 的 /brokers/topics 節點上註冊 watcher,當 topic 被建立,則 controller 會經過 watch 獲得該 topic 的 partition/replica 分配。 2. controller從 /brokers/ids 讀取當前全部可用的 broker 列表,對於 set_p 中的每個 partition: 2.1 從分配給該 partition 的全部 replica(稱爲AR)中任選一個可用的 broker 做爲新的 leader,並將AR設置爲新的 ISR 2.2 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state 3. controller 經過 RPC 向相關的 broker 發送 LeaderAndISRRequest。
刪除 topic 的序列圖以下所示:
圖.6
流程說明:
1. controller 在 zooKeeper 的 /brokers/topics 節點上註冊 watcher,當 topic 被刪除,則 controller 會經過 watch 獲得該 topic 的 partition/replica 分配。 2. 若 delete.topic.enable=false,結束;不然 controller 註冊在 /admin/delete_topics 上的 watch 被 fire,controller 經過回調向對應的 broker 發送 StopReplicaRequest。
如圖.1所示,同一個 partition 可能會有多個 replica(對應 server.properties 配置中的 default.replication.factor=N)。沒有 replica 的狀況下,一旦 broker 宕機,其上全部 patition 的數據都不可被消費,同時 producer 也不能再將數據存於其上的 patition。引入replication 以後,同一個 partition 可能會有多個 replica,而這時須要在這些 replica 之間選出一個 leader,producer 和 consumer 只與這個 leader 交互,其它 replica 做爲 follower 從 leader 中複製數據。
Kafka 分配 Replica 的算法以下:
1. 將全部 broker(假設共 n 個 broker)和待分配的 partition 排序 2. 將第 i 個 partition 分配到第(i mod n)個 broker 上 3. 將第 i 個 partition 的第 j 個 replica 分配到第((i + j) mode n)個 broker上
當 partition 對應的 leader 宕機時,須要從 follower 中選舉出新 leader。在選舉新leader時,一個基本的原則是,新的 leader 必須擁有舊 leader commit 過的全部消息。
kafka 在 zookeeper 中(/brokers/.../state)動態維護了一個 ISR(in-sync replicas),由3.3節的寫入流程可知 ISR 裏面的全部 replica 都跟上了 leader,只有 ISR 裏面的成員才能選爲 leader。對於 f+1 個 replica,一個 partition 能夠在容忍 f 個 replica 失效的狀況下保證消息不丟失。
當全部 replica 都不工做時,有兩種可行的方案:
1. 等待 ISR 中的任一個 replica 活過來,並選它做爲 leader。可保障數據不丟失,但時間可能相對較長。 2. 選擇第一個活過來的 replica(不必定是 ISR 成員)做爲 leader。沒法保障數據不丟失,但相對不可用時間較短。
kafka 0.8.* 使用第二種方式。
kafka 經過 Controller 來選舉 leader,流程請參考5.3節。
kafka broker failover 序列圖以下所示:
圖.7
流程說明:
1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 節點註冊 Watcher,當 broker 宕機時 zookeeper 會 fire watch 2. controller 從 /brokers/ids 節點讀取可用broker 3. controller決定set_p,該集合包含宕機 broker 上的全部 partition 4. 對 set_p 中的每個 partition 4.1 從/brokers/topics/[topic]/partitions/[partition]/state 節點讀取 ISR 4.2 決定新 leader(如4.3節所描述) 4.3 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節點 5. 經過 RPC 向相關 broker 發送 leaderAndISRRequest 命令
當 controller 宕機時會觸發 controller failover。每一個 broker 都會在 zookeeper 的 "/controller" 節點註冊 watcher,當 controller 宕機時 zookeeper 中的臨時節點消失,全部存活的 broker 收到 fire 的通知,每一個 broker 都嘗試建立新的 controller path,只有一個競選成功並當選爲 controller。
當新的 controller 當選時,會觸發 KafkaController.onControllerFailover 方法,在該方法中完成以下操做:
1. 讀取並增長 Controller Epoch。 2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上註冊 watcher。 3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上註冊 watcher。 4. 經過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上註冊 watcher。 5. 若 delete.topic.enable=true(默認值是 false),則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上註冊 watcher。 6. 經過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上註冊Watch。 7. 初始化 ControllerContext 對象,設置當前全部 topic,「活」着的 broker 列表,全部 partition 的 leader 及 ISR等。 8. 啓動 replicaStateMachine 和 partitionStateMachine。 9. 將 brokerState 狀態設置爲 RunningAsController。 10. 將每一個 partition 的 Leadership 信息發送給全部「活」着的 broker。 11. 若 auto.leader.rebalance.enable=true(默認值是true),則啓動 partition-rebalance 線程。 12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應的Topic。
kafka 提供了兩套 consumer API:
1. The high-level Consumer API 2. The SimpleConsumer API
其中 high-level consumer API 提供了一個從 kafka 消費數據的高層抽象,而 SimpleConsumer API 則須要開發人員更多地關注細節。
high-level consumer API 提供了 consumer group 的語義,一個消息只能被 group 內的一個 consumer 所消費,且 consumer 消費消息時不關注 offset,最後一個 offset 由 zookeeper 保存。
使用 high-level consumer API 能夠是多線程的應用,應當注意:
1. 若是消費線程大於 patition 數量,則有些線程將收不到消息 2. 若是 patition 數量大於線程數,則有些線程多收到多個 patition 的消息 3. 若是一個線程消費多個 patition,則沒法保證你收到的消息的順序,而一個 patition 內的消息是有序的
若是你想要對 patition 有更多的控制權,那就應該使用 SimpleConsumer API,好比:
1. 屢次讀取一個消息 2. 只消費一個 patition 中的部分消息 3. 使用事務來保證一個消息僅被消費一次
可是使用此 API 時,partition、offset、broker、leader 等對你再也不透明,須要本身去管理。你須要作大量的額外工做:
1. 必須在應用程序中跟蹤 offset,從而肯定下一條應該消費哪條消息 2. 應用程序須要經過程序獲知每一個 Partition 的 leader 是誰 3. 須要處理 leader 的變動
使用 SimpleConsumer API 的通常流程以下:
1. 查找到一個「活着」的 broker,而且找出每一個 partition 的 leader 2. 找出每一個 partition 的 follower 3. 定義好請求,該請求應該能描述應用程序須要哪些數據 4. fetch 數據 5. 識別 leader 的變化,並對之做出必要的響應
如下針對 high-level Consumer API 進行說明。
如 2.2 節所說, kafka 的分配單位是 patition。每一個 consumer 都屬於一個 group,一個 partition 只能被同一個 group 內的一個 consumer 所消費(也就保障了一個消息只能被 group 內的一個 consuemr 所消費),可是多個 group 能夠同時消費這個 partition。
kafka 的設計目標之一就是同時實現離線處理和實時處理,根據這一特性,可使用 spark/Storm 這些實時處理系統對消息在線處理,同時使用 Hadoop 批處理系統進行離線處理,還能夠將數據備份到另外一個數據中心,只須要保證這三者屬於不一樣的 consumer group。以下圖所示:
圖.8
consumer 採用 pull 模式從 broker 中讀取數據。
push 模式很難適應消費速率不一樣的消費者,由於消息發送速率是由 broker 決定的。它的目標是儘量以最快速度傳遞消息,可是這樣很容易形成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則能夠根據 consumer 的消費能力以適當的速率消費消息。
對於 Kafka 而言,pull 模式更合適,它可簡化 broker 的設計,consumer 可自主控制消費消息的速率,同時 consumer 能夠本身控制消費方式——便可批量消費也可逐條消費,同時還能選擇不一樣的提交方式從而實現不一樣的傳輸語義。
若是將 consumer 設置爲 autocommit,consumer 一旦讀到數據當即自動 commit。若是隻討論這一讀取消息的過程,那 Kafka 確保了 Exactly once。
但實際使用中應用程序並不是在 consumer 讀取完數據就結束了,而是要進行進一步處理,而數據處理與 commit 的順序在很大程度上決定了consumer delivery guarantee:
1.讀完消息先 commit 再處理消息。 這種模式下,若是 consumer 在 commit 後還沒來得及處理消息就 crash 了,下次從新開始工做後就沒法讀到剛剛已提交而未處理的消息,這就對應於 At most once 2.讀完消息先處理再 commit。 這種模式下,若是在處理完消息以後 commit 以前 consumer crash 了,下次從新開始工做時還會處理剛剛未 commit 的消息,實際上該消息已經被處理過了。這就對應於 At least once。 3.若是必定要作到 Exactly once,就須要協調 offset 和實際操做的輸出。 精典的作法是引入兩階段提交。若是能讓 offset 和操做輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,由於許多輸出系統可能不支持兩階段提交。好比,consumer 拿到數據後可能把數據放到 HDFS,若是把最新的 offset 和數據自己一塊兒寫到 HDFS,那就能夠保證數據的輸出和 offset 的更新要麼都完成,要麼都不完成,間接實現 Exactly once。(目前就 high-level API而言,offset 是存於Zookeeper 中的,沒法存於HDFS,而SimpleConsuemr API的 offset 是由本身去維護的,能夠將之存於 HDFS 中)
總之,Kafka 默認保證 At least once,而且容許經過設置 producer 異步提交來實現 At most once(見文章《kafka consumer防止數據丟失》)。而 Exactly once 要求與外部存儲系統協做,幸運的是 kafka 提供的 offset 能夠很是直接很是容易得使用這種方式。
更多關於 kafka 傳輸語義的信息請參考《Message Delivery Semantics》。
當有 consumer 加入或退出、以及 partition 的改變(如 broker 加入或退出)時會觸發 rebalance。consumer rebalance算法以下:
1. 將目標 topic 下的全部 partirtion 排序,存於PT 2. 對某 consumer group 下全部 consumer 排序,存於 CG,第 i 個consumer 記爲 Ci 3. N=size(PT)/size(CG),向上取整 4. 解除 Ci 對原來分配的 partition 的消費權(i從0開始) 5. 將第i*N到(i+1)*N-1個 partition 分配給 Ci
在 0.8.*版本,每一個 consumer 都只負責調整本身所消費的 partition,爲了保證整個consumer group 的一致性,當一個 consumer 觸發了 rebalance 時,該 consumer group 內的其它全部其它 consumer 也應該同時觸發 rebalance。這會致使如下幾個問題:
1.Herd effect 任何 broker 或者 consumer 的增減都會觸發全部的 consumer 的 rebalance 2.Split Brain 每一個 consumer 分別單獨經過 zookeeper 判斷哪些 broker 和 consumer 宕機了,那麼不一樣 consumer 在同一時刻從 zookeeper 看到的 view 就可能不同,這是由 zookeeper 的特性決定的,這就會形成不正確的 reblance 嘗試。 3. 調整結果不可控 全部的 consumer 都並不知道其它 consumer 的 rebalance 是否成功,這可能會致使 kafka 工做在一個不正確的狀態。
基於以上問題,kafka 設計者考慮在0.9.*版本開始使用中心 coordinator 來控制 consumer rebalance,而後又從簡便性和驗證要求兩方面考慮,計劃在 consumer 客戶端實現分配方案。(見文章《Kafka Detailed Consumer Coordinator Design》和《Kafka Client-side Assignment Proposal》),此處再也不贅述。
最開始在本機搭建了kafka僞集羣,本地 producer 客戶端成功發佈消息至 broker。隨後在服務器上搭建了 kafka 集羣,在本機鏈接該集羣,producer 卻沒法發佈消息到 broker(奇怪也沒有拋錯)。最開始懷疑是 iptables 沒開放,因而開放端口,結果還不行(又開始是代碼問題、版本問題等等,倒騰了好久)。最後沒辦法,一項一項查看 server.properties 配置,發現如下兩個配置:
# The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = security_protocol://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
以上說的就是 advertised.listeners 是 broker 給 producer 和 consumer 鏈接使用的,若是沒有設置,就使用 listeners,而若是 host_name 沒有設置的話,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主機名。
修改方法:
1. listeners=PLAINTEXT://121.10.26.XXX:9092 2. advertised.listeners=PLAINTEXT://121.10.26.XXX:9092
修改後重啓服務,正常工做。關於更多 kafka 配置說明,見文章《Kafka學習整理三(borker(0.9.0及0.10.0)配置)》。
2. 《Kafka設計解析(二):Kafka High Availability (上)》
3. 《Kafka設計解析(二):Kafka High Availability (下)》
4. 《Kafka設計解析(四):Kafka Consumer解析》
5. 《Kafka設計解析(五):Kafka Benchmark》
6. 《Kafka學習整理三(borker(0.9.0及0.10.0)配置)》
7. 《Using the High Level Consumer》
8. 《Using SimpleConsumer》
9. 《Consumer Client Re-Design》
10. 《Message Delivery Semantics》
11. 《Kafka Detailed Consumer Coordinator Design》
12. 《Kafka Client-side Assignment Proposal》
13. 《Kafka和DistributedLog技術對比》
14. 《kafka安裝和啓動》
15. 《kafka consumer防止數據丟失》
參考:
https://kafka.apache.org/documentation
http://orchome.com/kafka/index