Apache Kafka:下一代分佈式消息系統

簡介

Apache Kafka是分佈式發佈-訂閱消息系統。它最初由LinkedIn公司開發,以後成爲Apache項目的一部分。Kafka是一種快速、可擴展的、設計內在就是分佈式的,分區的和可複製的提交日誌服務。html

Apache Kafka與傳統消息系統相比,有如下不一樣:java

  • 它被設計爲一個分佈式系統,易於向外擴展;node

  • 它同時爲發佈和訂閱提供高吞吐量;linux

  • 它支持多訂閱者,當失敗時能自動平衡消費者;ios

  • 它將消息持久化到磁盤,所以可用於批量消費,例如ETL,以及實時應用程序。git

本文我將重點介紹Apache Kafka的架構、特性和特色,幫助咱們理解Kafka爲什麼比傳統消息服務更好。程序員

我將比較Kafak和傳統消息服務RabbitMQ、Apache ActiveMQ的特色,討論一些Kafka優於傳統消息服務的場景。在最後一節,咱們將探討一個進行中的示例應用,展現Kafka做爲消息服務器的用途。這個示例應用的完整源代碼在GitHub。關於它的詳細討論在本文的最後一節。github

架構

首先,我介紹一下Kafka的基本概念。它的架構包括如下組件:算法

  • 話題(Topic)是特定類型的消息流。消息是字節的有效負載(Payload),話題是消息的分類名或種子(Feed)名。apache

  • 生產者(Producer)是可以發佈消息到話題的任何對象。

  • 已發佈的消息保存在一組服務器中,它們被稱爲代理(Broker)或Kafka集羣

  • 消費者能夠訂閱一個或多個話題,並從Broker拉數據,從而消費這些已發佈的消息。

圖1:Kafka生產者、消費者和代理環境

生產者能夠選擇本身喜歡的序列化方法對消息內容編碼。爲了提升效率,生產者能夠在一個發佈請求中發送一組消息。下面的代碼演示瞭如何建立生產者併發送消息。

生產者示例代碼:

producer = new Producer(…); 
message = new Message(「test message str」.getBytes()); 
set = new MessageSet(message); 
producer.send(「topic1」, set);

爲了訂閱話題,消費者首先爲話題建立一個或多個消息流。發佈到該話題的消息將被均衡地分發到這些流。每一個消息流爲不斷產生的消息提供了迭代接口。而後消費者迭代流中的每一條消息,處理消息的有效負載。與傳統迭代器不一樣,消息流迭代器永不中止。若是當前沒有消息,迭代器將阻塞,直到有新的消息發佈到該話題。Kafka同時支持點到點分發模型(Point-to-point delivery model),即多個消費者共同消費隊列中某個消息的單個副本,以及發佈-訂閱模型(Publish-subscribe model),即多個消費者接收本身的消息副本。下面的代碼演示了消費者如何使用消息。

消費者示例代碼:

streams[] = Consumer.createMessageStreams(「topic1」, 1) 
for (message : streams[0]) { 
bytes = message.payload(); 
// do something with the bytes 
}

Kafka的總體架構如圖2所示。由於Kafka內在就是分佈式的,一個Kafka集羣一般包括多個代理。爲了均衡負載,將話題分紅多個分區,每一個代理存儲一或多個分區。多個生產者和消費者可以同時生產和獲取消息。

圖2:Kafka架構

Kafka存儲

Kafka的存儲佈局很是簡單。話題的每一個分區對應一個邏輯日誌。物理上,一個日誌爲相同大小的一組分段文件。每次生產者發佈消息到一個分區,代理就將消息追加到最後一個段文件中。當發佈的消息數量達到設定值或者通過必定的時間後,段文件真正寫入磁盤中。寫入完成後,消息公開給消費者。

與傳統的消息系統不一樣,Kafka系統中存儲的消息沒有明確的消息Id。

消息經過日誌中的邏輯偏移量來公開。這樣就避免了維護配套密集尋址,用於映射消息ID到實際消息地址的隨機存取索引結構的開銷。消息ID是增量的,但不連續。要計算下一消息的ID,能夠在其邏輯偏移的基礎上加上當前消息的長度。

消費者始終從特定分區順序地獲取消息,若是消費者知道特定消息的偏移量,也就說明消費者已經消費了以前的全部消息。消費者向代理髮出異步拉請求,準備字節緩衝區用於消費。每一個異步拉請求都包含要消費的消息偏移量。Kafka利用sendfile API高效地從代理的日誌段文件中分發字節給消費者。

圖3:Kafka存儲架構

Kafka代理

與其它消息系統不一樣,Kafka代理是無狀態的。這意味着消費者必須維護已消費的狀態信息。這些信息由消費者本身維護,代理徹底無論。這種設計很是微妙,它自己包含了創新。

  • 從代理刪除消息變得很棘手,由於代理並不知道消費者是否已經使用了該消息。Kafka創新性地解決了這個問題,它將一個簡單的基於時間的SLA應用於保留策略。當消息在代理中超過必定時間後,將會被自動刪除。

  • 這種創新設計有很大的好處,消費者能夠故意倒回到老的偏移量再次消費數據。這違反了隊列的常見約定,但被證實是許多消費者的基本特徵。

ZooKeeper與Kafka

考慮一下有多個服務器的分佈式系統,每臺服務器都負責保存數據,在數據上執行操做。這樣的潛在例子包括分佈式搜索引擎、分佈式構建系統或者已知的系統如Apache Hadoop。全部這些分佈式系統的一個常見問題是,你如何在任一時間點肯定哪些服務器活着而且在工做中。最重要的是,當面對這些分佈式計算的難題,例如網絡失敗、帶寬限制、可變延遲鏈接、安全問題以及任何網絡環境,甚至跨多個數據中心時可能發生的錯誤時,你如何可靠地作這些事。這些正是Apache ZooKeeper所關注的問題,它是一個快速、高可用、容錯、分佈式的協調服務。你可使用ZooKeeper構建可靠的、分佈式的數據結構,用於羣組成員、領導人選舉、協同工做流和配置服務,以及廣義的分佈式數據結構如鎖、隊列、屏障(Barrier)和鎖存器(Latch)。許多知名且成功的項目依賴於ZooKeeper,其中包括HBase、Hadoop 2.0、Solr Cloud、Neo4J、Apache Blur(Incubating)和Accumulo。

ZooKeeper是一個分佈式的、分層級的文件系統,能促進客戶端間的鬆耦合,並提供最終一致的,相似於傳統文件系統中文件和目錄的Znode視圖。它提供了基本的操做,例如建立、刪除和檢查Znode是否存在。它提供了事件驅動模型,客戶端能觀察特定Znode的變化,例如現有Znode增長了一個新的子節點。ZooKeeper運行多個ZooKeeper服務器,稱爲Ensemble,以得到高可用性。每一個服務器都持有分佈式文件系統的內存複本,爲客戶端的讀取請求提供服務。

圖4:ZooKeeper Ensemble架構

上圖4展現了典型的ZooKeeper ensemble,一臺服務器做爲Leader,其它做爲Follower。當Ensemble啓動時,先選出Leader,而後全部Follower複製Leader的狀態。全部寫請求都經過Leader路由,變動會廣播給全部Follower。變動廣播被稱爲原子廣播

Kafka中ZooKeeper的用途:正如ZooKeeper用於分佈式系統的協調和促進,Kafka使用ZooKeeper也是基於相同的緣由。ZooKeeper用於管理、協調Kafka代理。每一個Kafka代理都經過ZooKeeper協調其它Kafka代理。當Kafka系統中新增了代理或者某個代理故障失效時,ZooKeeper服務將通知生產者和消費者。生產者和消費者據此開始與其它代理協調工做。Kafka總體系統架構如圖5所示。

圖5:Kafka分佈式系統的整體架構

Apache Kafka對比其它消息服務

讓咱們瞭解一下使用Apache Kafka的兩個項目,以對比其它消息服務。這兩個項目分別是LinkedIn和個人項目:

LinkedIn的研究

LinkedIn團隊作了個實驗研究,對比Kafka與Apache ActiveMQ V5.4和RabbitMQ V2.4的性能。他們使用ActiveMQ默認的消息持久化庫Kahadb。LinkedIn在兩臺Linux機器上運行他們的實驗,每臺機器的配置爲8核2GHz、16GB內存,6個磁盤使用RAID10。兩臺機器經過1GB網絡鏈接。一臺機器做爲代理,另外一臺做爲生產者或者消費者。

生產者測試

LinkedIn團隊在全部系統中配置代理,異步將消息刷入其持久化庫。對每一個系統,運行一個生產者,總共發佈1000萬條消息,每條消息200字節。Kafka生產者以1和50批量方式發送消息。ActiveMQ和RabbitMQ彷佛沒有簡單的辦法來批量發送消息,LinkedIn假定它的批量值爲1。結果以下面的圖6所示:

圖6:LinkedIn的生產者性能實驗結果

Kafka性能要好不少的主要緣由包括:

  • Kafka不等待代理的確認,以代理能處理的最快速度發送消息。

  • Kafka有更高效的存儲格式。平均而言,Kafka每條消息有9字節的開銷,而ActiveMQ有144字節。其緣由是JMS所需的沉重消息頭,以及維護各類索引結構的開銷。LinkedIn注意到ActiveMQ一個最忙的線程大部分時間都在存取B-Tree以維護消息元數據和狀態。

消費者測試

爲了作消費者測試,LinkedIn使用一個消費者獲取總共1000萬條消息。LinkedIn讓全部系統每次拉請求都預獲取大約相同數量的數據,最多1000條消息或者200KB。對ActiveMQ和RabbitMQ,LinkedIn設置消費者確認模型爲自動。結果如圖7所示。

圖7:LinkedIn的消費者性能實驗結果

Kafka性能要好不少的主要緣由包括:

  • Kafka有更高效的存儲格式;在Kafka中,從代理傳輸到消費者的字節更少。

  • ActiveMQ和RabbitMQ兩個容器中的代理必須維護每一個消息的傳輸狀態。LinkedIn團隊注意到其中一個ActiveMQ線程在測試過程當中,一直在將KahaDB頁寫入磁盤。與此相反,Kafka代理沒有磁盤寫入動做。最後,Kafka經過使用sendfile API下降了傳輸開銷。

目前,我正在工做的一個項目提供實時服務,從消息中快速並準確地提取場外交易市場(OTC)訂價內容。這是一個很是重要的項目,處理近25種資產類別的財務信息,包括債券、貸款和ABS(資產擔保證券)。項目的原始信息來源涵蓋了歐洲、北美、加拿大和拉丁美洲的主要金融市場領域。下面是這個項目的一些統計,說明了解決方案中包括高效的分佈式消息服務是多麼重要:

  • 天天處理的消息數量超過1,300,000

  • 天天解析的OTC價格數量超過12,000,000

  • 支持超過25種資產類別;

  • 天天解析的獨立票據超過70,000

消息包含PDF、Word文檔、Excel及其它格式。OTC訂價也可能要從附件中提取。

因爲傳統消息服務器的性能限制,當處理大附件時,消息隊列變得很是大,咱們的項目面臨嚴重的問題,JMSqueue一天須要啓動2-3次。重啓JMS隊列可能丟失隊列中的所有消息。項目須要一個框架,不論解析器(消費者)的行爲如何,都可以保住消息。Kafka的特性很是適用於咱們項目的需求。

當前項目具有的特性:

  1. 使用Fetchmail獲取遠程郵件消息,而後由Procmail過濾並處理,例如單獨分發基於附件的消息。

  2. 每條消息從單獨的文件獲取,該文件被處理(讀取和刪除)爲一條消息插入到消息服務器中。

  3. 消息內容從消息服務隊列中獲取,用於解析和提取信息。

示例應用

這個示例應用是基於我在項目中使用的原始應用修改後的版本。我已經刪除日誌的使用和多線程特性,使示例應用的工件儘可能簡單。示例應用的目的是展現如何使用Kafka生產者和消費者的API。應用包括一個生產者示例(簡單的生產者代碼,演示Kafka生產者API用法併發布特定話題的消息),消費者示例(簡單的消費者代碼,用於演示Kafka消費者API的用法)以及消息內容生成API(在特定路徑下生成消息內容到文件的API)。下圖展現了各組件以及它們與系統中其它組件間的關係。

圖8:示例應用組件架構

示例應用的結構與Kafka源代碼中的例子程序類似。應用的源代碼包含Java源程序文件夾‘src’和'config'文件夾,後者包括幾個配置文件和一些Shell腳本,用於執行示例應用。要運行示例應用,請參照ReadMe.md文件或GitHub網站Wiki頁面的說明。

程序構建可使用Apache Maven,定製也很容易。若是有人想修改或定製示例應用的代碼,有幾個Kafka構建腳本已通過修改,可用於從新構建示例應用代碼。關於如何定製示例應用的詳細描述已經放在項目GitHub的Wiki頁面

如今,讓咱們看看示例應用的核心工件。

Kafka生產者代碼示例

/** 
 * Instantiates a new Kafka producer. 
 * 
 * @param topic the topic 
 * @param directoryPath the directory path 
 */ 
public KafkaMailProducer(String topic, String directoryPath) { 
       props.put("serializer.class", "kafka.serializer.StringEncoder"); 
       props.put("metadata.broker.list", "localhost:9092"); 
       producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); 
       this.topic = topic; 
       this.directoryPath = directoryPath; 
} 

public void run() { 
      Path dir = Paths.get(directoryPath); 
      try { 
           new WatchDir(dir).start(); 
           new ReadDir(dir).start(); 
      } catch (IOException e) { 
           e.printStackTrace(); 
      } 
}

上面的代碼片段展現了Kafka生產者API的基本用法,例如設置生產者的屬性,包括髮布哪一個話題的消息,可使用哪一個序列化類以及代理的相關信息。這個類的基本功能是從郵件目錄讀取郵件消息文件,而後做爲消息發佈到Kafka代理。目錄經過java.nio.WatchService類監視,一旦新的郵件消息Dump到該目錄,就會被當即讀取並做爲消息發佈到Kafka代理。

Kafka消費者代碼示例

public KafkaMailConsumer(String topic) { 
       consumer = 
Kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 
       this.topic = topic; 
} 

/** 
 * Creates the consumer config. 
 * 
 * @return the consumer config 
 */ 
private static ConsumerConfig createConsumerConfig() { 
      Properties props = new Properties(); 
      props.put("zookeeper.connect", KafkaMailProperties.zkConnect); 
      props.put("group.id", KafkaMailProperties.groupId); 
      props.put("zookeeper.session.timeout.ms", "400"); 
      props.put("zookeeper.sync.time.ms", "200"); 
      props.put("auto.commit.interval.ms", "1000"); 
      return new ConsumerConfig(props); 
} 

public void run() { 
      Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
      topicCountMap.put(topic, new Integer(1)); 
      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
      KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); 
      ConsumerIterator<byte[], byte[]> it = stream.iterator();
      while (it.hasNext()) 
      System.out.println(new String(it.next().message())); 
}

上面的代碼演示了基本的消費者API。正如咱們前面提到的,消費者須要設置消費的消息流。在Run方法中,咱們進行了設置,並在控制檯打印收到的消息。在個人項目中,咱們將其輸入到解析系統以提取OTC訂價。

在當前的質量保證系統中,咱們使用Kafka做爲消息服務器用於概念驗證(Proof of Concept,POC)項目,它的總體性能優於JMS消息服務。其中一個咱們感到很是興奮的特性是消息的再消費(re-consumption),這讓咱們的解析系統能夠按照業務需求從新解析某些消息。基於Kafka這些很好的效果,咱們正計劃使用它,而不是用Nagios系統,去作日誌聚合與分析。

總結

Kafka是一種處理大量數據的新型系統。Kafka基於拉的消費模型讓消費者以本身的速度處理消息。若是處理消息時出現了異常,消費者始終能夠選擇再消費該消息。

關於做者

Abhishek Sharma是金融領域產品的天然語言處理(NLP)、機器學習和解析程序員。他爲多個公司提供算法設計和解析開發。Abhishek的興趣包括分佈式系統、天然語言處理和使用機器算法進行大數據分析。

 

查看英文原文:Apache Kafka: Next Generation Distributed Messaging System

相關文章
相關標籤/搜索