Apache Kafka框架學習

背景介紹java

消息隊列的比較算法

kafka框架介紹數據庫

  術語解釋緩存

  文件存儲服務器

  可靠性保證session

  高吞吐量實現負載均衡

  負載均衡框架

應用場景dom

 

背景介紹:異步

  kafka是由Apache軟件基金會維護的一個開源流處理平臺,由scala和java編寫。最先開發自LinkedIn,用作LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。如今它已被多家不一樣類型的公司做爲多種類型的數據管道和消息系統使用。

  kafka是一種分佈式的,基於發佈/訂閱的消息系統。

  kafka特色:

    快速持久化,能夠在o(1)的系統開銷下進行消息持久化;

    高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;

    徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現負載均衡;

    kafka經過Hadoop的並行加載機制統一了在線和離線的消息處理。

    Apache Kafka相對於ActiveMQ是一個很是輕量級的消息系統,除了性能很是好以外,仍是一個工做良好的分佈式系統。

消息隊列:

使用消息隊列的好處:

   解耦、擴展性、靈活性&峯值處理能力、可恢復性、順序保證、緩衝......

RabbitMQ

  RabbitMQ是使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP,SMTP,STOMP,也正因如此,它很是重量級。

Redis

  Redis是一個基於Key-Value對的NoSQL數據庫,開發維護很活躍。雖然它是一個Key-Value數據庫存儲系統,但它自己支持MQ功能。對於RabbitMQ和Redis的入隊和出隊操做,各執行100萬次,每10萬次記錄一次執行時間。測試數據分爲128Bytes、512Bytes、1K和10K四個不一樣大小的數據。實驗代表:入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而若是數據大小超過了10K,Redis則慢的沒法忍受;出隊時,不管數據大小,Redis都表現出很是好的性能,而RabbitMQ的出隊性能則遠低於Redis。

ZeroMQ

  ZeroMQ號稱最快的消息隊列系統,尤爲針對大吞吐量的需求場景。ZMQ可以實現RabbitMQ不擅長的高級/複雜的隊列,可是開發人員須要本身組合多種技術框架,技術上的複雜度是對這MQ可以應用成功的挑戰。ZeroMQ具備一個獨特的非中間件的模式,你只須要簡單的引用ZeroMQ程序庫,就能夠愉快的在應用程序之間發送消息了。可是ZeroMQ僅提供非持久性的隊列,也就是說若是宕機,數據將會丟失。

ActiveMQ

  ActiveMQ是Apache下的一個子項目。相似於ZeroMQ,它可以以代理人和點對點的技術實現隊列。同時相似於RabbitMQ,它以少許代碼就能夠高效的實現高級應用場景。

kafka框架:

  Broker:kafka集羣包含一個或多個服務器,每一個服務器被稱爲一個broker。

  Producer:負責發佈消息到Kafka broker。

  Consumer:消息消費者,從Kafka broker讀取消息的客戶端。

  Consumer Group:每一個Consumer屬於一個特定的Consumer Group(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group)。當多個Consumer屬於同一個Group時,它們所訂閱的消息只會發佈到該組的producer;當須要每一個Consumer都接受到消息時,能夠賦予不一樣的id。

  Topic:每條發佈到Kafka集羣的消息都有一個類別。這個類別被稱爲Topic。(物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處)。

  Partition:Partition是物理上的概念,每一個Topic包含一個或多個Partition;建議每一個Topic的Partition數量不超過集羣中的broker數量。

  Replica:kafka從0.8版本開始引入了副本機制,目的是爲了增長Kafka的高可用性。每一個Partition會有多個副本,而且從副本集合中(Assigned Replic,AR)中選取一個副本做爲leader副本,全部的讀寫請求都由leader副本處理。剩餘的副本做爲Follower副本,Follower副本從leader副本獲取消息並更新至本身的Log中。若是leader副本所在的Broker出現故障,會從Follower副本選擇一個做爲Leader提供服務,保證Kafka的高可用性。

  Topic&Partition&Replica示意圖

 

  Topic&Partition&Replica分配算法:

  1.將全部存活的N個Brokers和待分配的Partition排序;

  2.將第i個Partition分配到第(i mod n)個Broker上;而且會做爲Partition的優先副本(這裏就基本說明了一個topic的leader partition在集羣上的大體分佈狀況);

  3.將第i個Partition的第j個Replica分配到第((i + j)mod n)個Broker上。

  假設集羣一共有4個brokers,一個topic有4個partition,每一個Partition有3個副本,下圖是每一個Broker上的副本分配狀況。

  

Kafka框架-文件存儲機制:

  kafka中的Message是以topic爲基本單位組織的,不一樣的topic之間是相互獨立的。每一個topic又能夠分紅幾個不一樣的partition(每一個topic有幾個partition是在建立topic時指定的),每一個partition存儲一部分Message。

  Topic&partition&Message關係圖:

 

  partition是以文件的形式存儲在文件系統中,好比,建立了一個名爲page_visits的topic,其有5個partition,那麼在Kafka的數據目錄中(由配置文件中的log.dirs指定的)中就有這樣5個目錄:page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名規則爲:<topic_name>-<partition_id>,裏面存儲的分別就是這5個partition的數據。

Partition的數據文件:

  Partition中的每條Message由offset來表示它在這個partition中的偏移量,不是該Message在partition數據文件中的實際存儲位置,而是邏輯上的一個值,它惟一肯定了partition中的一條Message。所以,能夠認爲offset是partition中Message的id。partition中的每條Message包含了如下三個屬性:offset、messageSize、data。其中offset爲long型,MessageSize爲int32,表示data有多大,data爲message的具體內容。

   如上描述,若是每一個Partition對應一個存儲文件,當一個Partition上存儲大量消息時,追加消息的複雜度爲o(1);查找一個消息時,須要遍歷整個文件,複雜度o(n)。

Kafka解決方案:

  分段:

    好比有100條Message,它們的offset是從0到99.假設將數據文件分紅5段,第一段爲0-19,第二段爲20-39,以此類推,每段放在一個單獨的數據文件裏面,數據文件以該段中最小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就能夠定位到該Message在哪一個段中。

  

  索引:

    Kakfa爲每一個分段後的數據文件創建了索引文件,文件名與數據文件的名字是同樣的。只是文件擴展名爲.index。index文件中並無爲數據文件中的每條Message創建索引,而是採用了稀疏存儲的方式,每隔必定字節的數據創建一條索引。這樣避免了索引文件佔用過多的空間,從而能夠將索引文件保留在內存中。

  消息查找過程:

    好比:要查找絕對offset爲7的Message:

      1.用二分查找肯定它是在那個LogSegment中:在第一個Segment中。

      2.打開這個Segment的index文件,也是用二分查找找到offset小於或者等於指定offset的索引條目中最大的那個offset。天然offset爲6的那個索引是咱們要找的,經過索引文件咱們知道offset爲6的Message在數據文件中位置爲9087.

      3.打開數據文件,從位置爲9807的那個地方開始順序掃描直到找到offset爲7的那條Message。

      這套機制是創建在offset是有序的。索引文件居中,因此查找的速度仍是挺快的。一句話,kafka的Message存儲採用了分區(partition),分段(LogSegment)映射到內存和稀疏索引這幾個手段來達到了高效性。

Kafka框架-數據可靠性保證:

Broker分析:

  對於broker,落盤的數據,除非磁盤壞了,通常不會丟的。

  對於內存髒(沒有flush磁盤)數據,broker重啓會丟,能夠經過log.flush.interval.messages和log.flush.interval.ms來配置flush間隔,interval大丟的數據多些。

  Replica機制:是否使用replica取決於在可靠性和資源代價之間的平衡。

Consumer從Broker拉取消息:

  Kafka中有兩種consumer接口,分別爲Low-level API和High-level API

  (1).Low-level API SimpleConsumer,這套接口比較複雜的,使用者必需要考慮不少事情,優勢就是對Kafka能夠有徹底的控制。

  (2).High-level API 使用比較簡單,已經封裝了partition和offset的管理,默認是會按期自動commit offset,這樣可能會丟數據,由於consumer可能拿到數據沒有處理完crash。High-level API接口的特色,自動管理,使用簡單,可是對Kafka的控制不夠靈活。

  一種很是經常使用的選舉leader的方式是「majority vote」(「少數服從多數」),但Kafka並未採用這種方式。這種模式下,若是咱們有2f+1個replica(包含leader和follower),那在commit以前必須保證有f+1個replica複製完消息,爲了確保正確選出新的leader,fail的replica不能超過f個。由於在剩下的任意f+1個replica裏,至少有一個replica包含有最新的全部消息。這種方式有個很大的優點,系統的latency只取決於最快的幾臺sever,也就是說,若是replication factor是3,那latency就取決於最快的那個follower而非最慢那個。majority vote也有一些劣勢,爲了保證leader election的正常進行,它所能容忍的fail的follower個數比較少。若是要容忍1個follower掛掉,必需要有3個以上的replica,若是要容忍2個follower掛掉,必需要有5個以上的replica。也就是說,在生產環境下爲了保證較高的容錯程度,必需要有大量的replica,而大量的replica又會在大數據量下致使性能急劇降低。這就是這種算法更多用在Zookeeper這種共享集羣配置的系統中而不多在須要存儲大量數據的系統中使用的緣由。例如HDFS的HA feature是基於majority-vote-based journal,可是它的數據存儲並無使用這種expensive的方式。

如何肯定一個Broker是否還活着?

  1.它必須維護與Zookeeper的session(這個經過Zookeeper的Heartbeat機制來實現)。

  2.Follower必須可以及時將Leader的消息複製過來,不能「落後太多」。

  Leader會跟蹤與其保持同步的Replica列表,該列表稱爲ISR(即in-sync Replica)。若是一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裏所描述的「落後太多「指Follower複製的消息落後於Leader後的條數超過預約值(該值可在$KAFKA_HOME/config/server.properties中經過replica.lag.max.messages配置,其默認值是4000)或者Follower超過必定時間(該值可在$KAFKA_HOME/config/server.properties中經過replica.lag.time.max.ms來配置,其默認值是10000)未向Leader發送fetch請求。

  這裏的複製機制既不是同步複製,也不是單純的異步複製。事實上,同步複製要求」活着的」follower都複製完,這條消息纔會被認爲commit,這種複製方式極大的影響了吞吐率(高吞吐率是Kafka很是重要的一個特性)。而異步複製方式下,follower異步的從leader複製數據,數據只要被leader寫入log就被認爲已經commit,這種狀況下若是follower都落後於leader,而leader忽然宕機,則會丟失數據。而Kafka的這種使用「in sync」 list的方式則很好的均衡了確保數據不丟失以及吞吐率。follower能夠批量的從leader複製數據,這樣極大的提升複製性能(批量寫磁盤),極大減小了follower與leader的差距(前文又說到,只要follower落後leader不太遠,則被認爲在「in sync」 list裏)。

接受數據的可靠性保證:

  當producer向leader發送數據時,request.required.acks參數來設置可靠性的級別:

  1(默認):producer在ISR中的leader已成功收到的數據並獲得確認後發送下一條message。若是leader宕機了,則會丟失數據。

  0:producer無需等待來自broker的確認而繼續發送下一批消息。這種狀況下數據傳輸效率最高,可是數據可靠性倒是最低的。

  -1:producer須要等待ISR中的全部follower都確認接受到數據後纔算一次發送完成,可靠性最高。可是這樣也不能保證數據不丟失,好比當ISR中只有leader時(前面ISR那一節講到,ISR中的成員因爲某些狀況會增長也會減小,最少就剩一個leader),這樣就變成了acks=1的狀況。

接受數據可靠性保證:

  若是要提升數據的可靠性,在設置request.required.acks=-1的同時,也要min.insync.replicas這個參數(能夠在broker或者topic層面進行設置)的配合,這樣才能發揮最大的功效。

  min.insync.replicas這個參數設定ISR中的最小副本數是多少,默認爲1,當且僅當request.required.acks參數設置爲-1時,此參數才生效。若是ISR中的副本數少於min.insync.replicas配置的數量時,客戶端會返回異常。

  request.required.acks=-1,同步(Kafka默認爲同步,即producer.type=sync)的發送模式。

  replication.factor>=2且min.insvnc.replicas>=2的狀況下,不會丟失數據。注:Kafka只處理fail/recover問題,不處理Byzantine問題。

     

  圖4中若是選舉後一個爲leader,則前一個partition的HW不會更新,新消息繼續從offset爲5的地方存儲;此時,producer沒有收到ack消息,會繼續發送消息四、5,此時產生重複;kafka不解決,有用戶本身結局,好比在消息中添加全局Key。

Kafka框架-高吞吐量:

  順序讀/寫文件、批量消息傳遞、數據壓縮、Kakfa的消息存儲在OS pagecache(頁緩存,pagecache的大小爲一頁,一般爲4K,在Linux讀寫文件時,它用於緩存文件的邏輯內容,從而加快對磁盤上影像和數據的訪問)、Topic分爲多個Partition,多個Parttition同時提供服務。

Kafka框架-負載均衡:

  producer根據用戶指定的算法,將消息發送到指定的partition;

private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
  private val random = new java.util.Random
  def partition(key: T, numPartitions: Int): Int = {
    if(key == null)
    {
        println("key is null")
        random.nextInt(numPartitions)
    }
    else
    {
        println("key is "+ key + " hashcode is "+key.hashCode)
        math.abs(key.hashCode) % numPartitions
    }
  }
}

  Partition&replica均衡:存在多個partition,每一個partition有本身的replica,每一個replica分佈在不一樣的Broker節點上;

 

leader均衡:

  每當Leader Partition中止或崩潰領導,由其餘副本取代Leader地位。這意味着默認狀況下,當該Partition更新啓動時,它將只是做爲跟隨着,不會用於客戶端讀取和寫入,出現於其它Leader在同一個Broker的狀況。

  爲了不這種不平衡,Kafka有一個優先副本的概念。若是分區的副本的列表爲1,5,9,則節點1優選爲節點5或9的引導者,由於它在副本列表中較早。

  設置auto.leader.rebalance.enable = true便可實現上述操做;

  等待ISR中的任一個replica「活」過來,而且選它爲leader。選擇第一個「活」過來的replica(不必定是ISR中的)做爲leader。

  這就須要在可用性和一致性當中作出一個簡單的平衡。若是必定要等待ISR中的replica「活」過來,那不可用的時間就可能會行對較長。並且若是ISR中的全部replica都沒法「活」過來了,或者數據都丟失了,這個partition將永遠不可用。選擇第一個「活」過來的replcia做爲leader,而這個replica不是ISR中的replica,那即便它並不保證已經包含了全部已commit的消息,他也會成爲leader而做爲comsumer的數據源(前文有說明,全部讀寫都由leader完成)。Kafka0.8.×使用了第二種方式。根據Kafka的文檔,在之後的版本中,Kafka支持用戶經過配置選擇這兩種方式中的一種,從而根據不一樣的使用場景選擇高可用性仍是強一致性。

Consumer 均衡:

  Kafka保證的是穩定狀態下每個Consumer實例只會消費某一個或多個特定Partition的數據。而某個Partition的數據只會被某一個特定的Consumer實例所消費。也就是說Kafka對消息的分配是以Partition爲單位分配的,而非以每一條消息做爲分配單元。這樣設計的劣勢是沒法保證同一個Consumer Group裏的Consumer均勻消費數據,優點是每一個Consumer不用都跟大量的Broker通訊,減小通訊開銷,同時也下降了分配難度,保證每一個Partition裏的數據能夠被有序消費。

 

Consumer均衡方法:

  若是某Consumer Group中Consumer(每一個Consumer只建立1個MessageStream)數量少於Partition數量,則至少有一個Consumer會消費多個Partition的數據;若是Consumer的數量和Partition數量相同,則正好一個Consumer消費一個Partition的數據。而若是Consumer的數量多於Partition的數量時,會有部分Consumer沒法消費該Topic下任何一條消息。當添加、刪除Consumer時,會觸發Consumer的Rebalance算法,從新分配每一個Consumer消費的Partition。

  Consumer Rebalance的算法以下:

 Kafka框架-應用場景:

  消息隊列:比起大多數的消息系統來講,Kafka有更好的吞吐量,內置的分區,冗餘及容錯性,這讓Kafka成爲了一個很好的大規模消息處理應用的解決方案。

  網站活動跟蹤:跟蹤用戶瀏覽頁面、搜索以及其餘行爲,以發佈-訂閱的模式實時記錄到對應的topic裏。再作進一步的實時處理,或實時監控,或放到hadoop/離線數據倉庫裏處理。

  日誌收集:服務器上收集日誌文件,抽象成一個個日誌或事件的信息流,Kafka處理過程延遲低,更容易支持多數據源和分佈式數據處理。

  流處理:保存收集流數據,以提供以後對接的Storm或其餘流式計算框架進行處理。不少用戶將那些從原始topic來的數據進行階段性處理、彙總、擴充或者以其餘的方式轉換到新的topic下再繼續後面的處理。

相關文章
相關標籤/搜索