Storm實戰:在雲上搭建大規模實時數據流處理系統(Storm+Kafka)

在大數據時代,數據規模變得愈來愈大。因爲數據的增加速度和非結構化的特性,經常使用的軟硬件工具已沒法在用戶可容忍的時間內對數據進行採集、管理和處理。本文主要介紹如何在阿里雲上使用Kafka和Storm搭建大規模消息分發和實時數據流處理系統,以及這個過程當中主要遭遇的一些挑戰。實踐主要立足創建一套汽車狀態實時監控系統,能夠在阿里雲上當即進行部署。node

 

實時大數據處理利器——Storm和Kafka

大數據時代,隨着可獲取數據的渠道增多,好比常見的電子商務、網絡、傳感器的數據流、太空數據等,數據規模也變得愈來愈大;同時,不一樣的渠道每每產生更多的數據類型,這些衍生的數據增加很是之快,規模很是之大。大數據時代各個機構可謂是坐擁金山,然而目前大數據技術的應用卻仍然存在衆多挑戰,主要出如今數據收集、存儲、處理和可視化幾個過程。web

1. 數據收集

Gartner的Merv Adrian對大數據有這樣一個定義:「大數據讓經常使用硬件軟件工具沒法在用戶可容忍時間內對數據進行採集、管理和處理。」 [1] 麥肯錫全球研究院在2011年5月也有這樣一個概念:「大數據是指超出典型數據庫軟件工具採集、存儲、管理和分析能力的數據集。」 [2] 從上面的定義能夠看出,大數據最大的挑戰在於如何在有限時間內對數據進行處理和分析,並獲得有用信息數據庫

2. 數據處理

大數據處理中最著名的工具是Hadoop,不過它並非一套實時系統。爲了解決這個問題,計算機工程師們又開發了Storm和Kafka。Apache Storm是一套開源的分佈式實時計算系統。最先由Nathan Marz [3] 開發,在被Twitter收購後開源,並在2014年9月起成爲Apache頂級開源項目。Storm被普遍用於各類商業網站,包括Twitter、Yelp、Groupon、百度、淘寶等。Storm的使用場景很是普遍,例如實時分析、在線機器學習、連續計算、分部署RPC、ETL等。Storm有着很是快的處理速度,單節點能夠達到百萬個元組每秒,此外它還具備高擴展、容錯、保證數據處理等特性。緩存

圖1是Storm的一個簡單的架構。bash

 

圖1  Storm架構服務器

Apache Kafka也是一個開源的系統,旨在提供一個統一的,高吞吐、低延遲的分佈式消息處理平臺來對實時數據進行處理。它最先由LinkedIn開發,開源於2011年並被貢獻給了Apache。Kafka區別於傳統RabbitMQ、Apache ActiveMQ等消息系統的地方主要在於:分佈式系統特性,易於擴展;爲發佈和訂閱提供高吞吐量;支持多訂閱,能夠自動平衡消費者;能夠將消息持久化到磁盤,能夠用於批量消費,例如ETL等。網絡

 

圖2  Kafka架構架構

在阿里雲上部署Storm和Kafka

咱們須要設計一個實時車輛監控系統,這個系統要將汽車駕駛過程當中實時的位置,速度,轉速,油耗以及轉速發送到系統中,從而能夠實時計算出車流量和污染物排放量。該系統的目標是要能同事支持10萬輛車同時發送消息,在最高峯能知足100萬輛車。爲了實現如此規模的消息分發和吞吐,咱們基於Kafka和Storm來設計實現。同時爲了知足高擴展性,咱們將Storm和Kafka分別部署到不一樣的服務器上,若是須要更多的計算能力,能夠隨時經過建立新的服務器的方式來完成。此外爲了知足高可用性,每臺相同功能的服務器也須要至少部署2臺,這樣一旦一臺服務器出現問題,另一臺服務器也能夠持續提供服務。併發

在實體服務器上部署Storm和Kafka等系統涉及到大量服務器集羣和軟件的安裝部署,這個過程須要花費大量時間,而云計算則很好的彌補了這一點——提供各類虛擬服務器和鏡像功能,加快基礎設施和軟件的部署過程。app

基於雲的車聯網監控系統架構


 

圖3  車聯網監控系統架構

咱們須要2臺服務器來構建Kafka代理服務器,在Storm中還須要2臺服務器來運行Spout和2個Bolt,另外在Redis層則須要2臺服務器來部署緩存,再加上2臺服務器做爲Web服務器。服務器架構圖如圖4所示。

 

圖4  車聯網監控系統架構

在部署車聯網監控系統以前,咱們首先須要在每臺服務器上部署相應的軟件,包括Git、Libzmq、Java、G++等,用於代碼編譯和相關軟件安裝。可使用SSH鏈接到相應的機器。用戶名密碼則會由阿里雲以郵件或者短消息的方式提供。

在車聯網實時監控系統中,咱們須要部署4種不一樣類型的服務器,分別是網站前臺服務器、Kafka服務器、Storm服務器和緩存服務器,以知足上面提到的高擴展性的要求。在每一種類型的服務器部署完成以後,均可以經過阿里雲鏡像的功能,建立一個能隨時使用的鏡像,這樣在擴展服務器的時候就不須要從新安裝軟件,直接經過鏡像建立服務器就能夠了。

如下命令須要在全部服務器上運行以安裝相應的軟件:

如下命令安裝在緩存服務器和Kafka服務器上:

另外,咱們還須要在Storm的服務器安裝maven和lein用於代碼編譯:

 

在Kafka服務器上安裝Kafka:

 

對於Storm和Kafka的安裝,到這一步已基本完成,接下去須要分別建立鏡像。建立鏡像的方法是先建立阿里雲快照,而後經過將快照轉換爲鏡像的方式完成。具體步驟以下:

  • 在阿里雲的管理界面選擇雲服務器,隨後選擇該服務器的磁盤列表,點擊建立快照。
  • 輸入快照名稱並確認。
  • 阿里雲會自動爲雲服務器的系統盤建立快照,當建立完成之後,會出現「建立自定義鏡像」按鈕。
  • 點擊「建立自定義鏡像」的按鈕,阿里雲就會將這個快照轉換爲鏡像,能夠在阿里雲ECS管理界面的自定義鏡像欄中看到。

圖5  自定義的鏡像

接下來,咱們經過鏡像能夠直接建立相同配置的ECS服務器。

 

圖6  從自定義鏡像中建立雲服務器

固然,在自動擴展實現上,雲服務並不須要用戶去手動執行,這裏咱們使用阿里雲的ECS REST API自動經過鏡像建立機器。能夠參考如下Python代碼,自動建立阿里雲ECS虛擬機:

基於StormKafka的車輛信息實時監控系統打造

接下來作的就是將車輛信息實時監控系統部署到系統中。這個系統演示瞭如何編寫一個Storm的Topology,從Kafka消息系統中將信息讀取出來。咱們使用Kafka的客戶端模擬從世界各地發送車輛實時信息給Kafka集羣,而後Storm Topology會把這些消息經過Bolts將座標轉換爲Json對象,而且使用GeoJSON在Bing Map上顯示車輛的實時位置、溫度、轉速以及速度等等信息。Topology還會將信息寫到Redis緩存中,而後Node.js經過socket.io讀取Redis中的信息,而且使用d3js顯示在頁面上。

首先,咱們須要編寫Kafka 生產者的部分代碼,主要是模擬讀取汽車的實時數據並向Kafka集羣進行發送,咱們實現了一個KafkaCarDataProducer類,經過配置ProducerConfig來建立一個Producer對象來發送數據。它能夠用來鏈接到Zookeeper,或者直接是Kafka 代理。例如:kafkaclient.cloudapp.net:2181或者0:kafkaclient.cloudapp.net:9092。代碼中咱們根據不一樣的鏈接字符串設置不一樣配置。僞代碼以下:

而後就能夠直接經過下面代碼來發送消息:

接下來咱們須要編寫3個Storm類,首先是建立Storm的Topology,這個類叫KafkaCarTopology,咱們建立了一個叫car的topic,而後定義本機一個hosts和Zookeeper hosts,最後建立一個Spout,叫作KafkaSpout,而後添加ParseCarDataBolt鏈接到KafkaSout,再建立一個RedisCarBolt,用於將結果寫入Redis緩存。最後根據參數建立3個Worker,提交Storm Topology。

在這個拓撲結構中,咱們有2個Bolt用於數據的處理,第一個叫ParserCarDataBolt,這個Bolt主要將Kafka傳出的消息轉換爲Json格式,它繼承BaseBasicBolt,在execute函數中經過collector提交數據,同時重載了declareOutputFields函數,通知下一個Bolt的數據格式。代碼以下:

數據會被寫入RedisCarBolt,再寫入到Redis緩存中。它繼承自BaseRichBolt,須要重載prepare和excute方法來處理消息元組。此外還須要重載prepare和cleanup函數,幾個關鍵的函數以下:

最後咱們還須要編寫一些Node.js的代碼,保證在頁面上經過socket.io進行通信,實時將最終數據從Redis裏面讀取出來,並在BingMap上顯示。

到此爲止,一個簡單的車輛信息實時監控系統就實現了,咱們經過bash腳本進行編譯,並安裝到相應的服務器上,好比下列代碼須要被安裝在Storm的服務器上:

有一點須要注意的是,因爲在編譯過程當中須要自動下載Storm庫,在阿里雲的國內機房的虛擬機頗有可能須要設置代理進行。設置代理的方法也很簡單,經過對lein命令增長如下參數就能夠了:http_proxy=http://URL:PORT

接着咱們在網頁上訪問http://webhostname或者運行node.js的服務器,就會看到下面的網頁,同時發現網頁將同步刷新汽車的實時位置、速度、轉速等。

 

圖7  車聯網監控系統演示頁面

對車聯網監控系統的性能測試

接下來咱們對這個系統進行了一個簡單的吞吐量測試。咱們只有1個Topic,使用5個partition、3個worker、1個Spout和2個Bolt,在一臺2核2GB的ECS上運行。咱們使用了另外4臺客戶端,每一個客戶端有4核8G內存,分別啓動40個線程不斷向這個系統實時發送汽車信息,模擬160臺汽車發送的狀況,其消息發送數量和CPU佔用率狀況如圖8所示。

 

圖8  車聯網監控系統性能分析

從圖8中能夠看出,平均每輛汽車客戶端會模擬每秒給系統發送了1000條消息,總的吞吐量達到16萬條左右,此時平均的CPU佔用率大約在30%左右。若是系統是徹底線性的,在系統CPU佔用率達到90%的狀況下,大約能處理48萬條消息。不過實際狀況中,在阿里雲ECS上,卻發現CPU達到50%之後,就再也不上升,而客戶端發送消息的延時也逐步增長。

通過分析之後發現,因爲ECS的磁盤性能沒法和物理機的SSD磁盤相比,因此在Kafka消息大量寫入磁盤的過程當中,吞吐量降低,磁盤讀寫負擔變得很是大。這時咱們增長了Kafka的Broker和Storm的Spout的數量,將消息分佈式地分發到多臺ECS上,從而實現了消息吞吐量的線性增長。

在這個系統中,咱們不推薦使用大核和大內存的機器,而推薦使用多臺2核2GB的服務器分佈式地處理消息。這也是雲計算處理大數據的原則所在,使用橫向擴展而不用縱向擴展。

結論

至此咱們介紹了利用Storm和Kafka實現大數據的實時處理,而且介紹瞭如何在雲上經過鏡像快速地建立這套系統。此外,咱們還介紹瞭如何對Storm、Kafka、Redis以及Node.js開發出一個實時的車輛信息監控系統。這個系統可以實現高性能、大吞吐量和高併發。固然,隨着大數據的快速發展,咱們相信還會有愈來愈多好的工具和產品出如今市場上,到那時咱們從大數據中獲取有效的信息將會變得更加容易和便捷。有了雲計算的幫助,開發的週期也會變得愈來愈短。

 


雲角: 

[1] 「It’s going mainstream, and it’s your next opportunity.「, Teradata Magazine, Q1, 2011http://www.teradatamagazine.com/v11n01/Features/Big-Data/

[2] 」 Big data: The next frontier for innovation, competition, and productivity」http://www.mckinsey.com/insights/business_technology/big_data_the_next_frontier_for_innovation

[3] http://nathanmarz.com/about/

相關文章
相關標籤/搜索