Spring XD簡介:大數據應用的運行時環境

簡介

Spring XD(eXtreme Data,極限數據)是Pivotal的大數據產品。它結合了Spring BootGrails,組成Spring IO平臺的執行部分。儘管Spring XD利用了大量現存的Spring項目,但它是一種運行時環境,而不是一個類庫或者框架,它包含帶有服務器的bin目錄,你能夠經過命令行啓動並與之交互。運行時能夠運行在開發機上、客戶端本身的服務器上、AWS EC2上或者Cloud Foundry上。html

Spring XD中的關鍵組件是管理和容器服務器(Admin and Container Servers)。使用一種DSL,你能夠把所需處理任務的描述經過HTTP提交給管理服務器。而後管理服務器會把處理的任務映射處處理模塊(每一個模塊都是一個執行單元,做爲Spring應用程序上下文實現)中。java

該產品具備兩種操做模式:-single和multi-node。第一種是由單獨的進程負責全部處理和管理的工做。這對於入門頗有用,一樣適合於應用程序的快速開發和測試。本文中的全部實例都被設計爲在單一節點模式下工做。第二種是一種分佈式模式。分佈式集成運行時(Distributed Integration Runtime,DIRT)會在多個節點之間分發處理的任務。除了能夠擁有VM或者物理服務器做爲這些節點以外,Spring XD還讓你能夠在Hadoop YARN集羣上運行。node



XD管理服務器會把處理的任務切分紅彼此獨立的模塊定義,並把每一個模塊分配給使用Apache ZooKeeper的容器實例。每一個容器都會監聽分配給它的模塊定義,而後部署模塊,建立Spring應用程序上下文來運行它。須要注意的是,在我撰寫這篇文章的時候,Spring XD中還不會自帶Zookeeper。兼容的版本是3.4.6,你能夠從這裏下載。react

模塊經過使用配置好的消息中間件傳遞消息來共享數據。傳輸層是可插拔的,而且支持其餘兩種Pivotal項目——RedisRabbit MQ——以及現成可用的內存數據庫。git

用例

下圖讓你能夠對Spring XD有個整體上的瞭解。github


Spring XD團隊認爲,對於建立大數據解決方案來講,建立的主要用例有四種:數據吸納、實時分析、工做流調度以及導出。redis

數據吸納提供了一種能力,能夠從各類輸入源接收數據,並把它傳輸給大數據存儲庫,像HDFS(Hadoop文件系統)、Splunk或者MPP數據庫。和文件同樣,數據源可能包括來自於移動設備、支持MQ遙感傳輸協議(MQTT)的傳感器以及像Twitter之類的社交流的事件。算法

吸納過程會貫穿事件驅動數據的處理,以及針對其餘類型數據的批處理(MR、PIG、Hive、Cascading、SQL等等)。流和做業的兩個世界大相徑庭,可是Spring XD試圖使用通道抽象(channel abstraction)來模糊兩者之間的邊界,從而讓流能夠觸發批處理做業,而批處理做業也能夠發送事件從而觸發其餘流。spring

對於流來講,會經過叫作「Taps」的抽象來支持某些實時分析,像獲取指標和計數值。從概念上,Taps讓你能夠介入到流中,執行實時分析,並有選擇地爲外部系統生成數據,像GemFire、Redis或者其餘內存數據網格。shell

一旦你在大數據倉庫中擁有數據,那麼就須要某種工做流工具來對處理進行調度。調度很是必要,由於你編寫的腳本或者map-reduce做業一般會長時間運行,並採用帶有多個步驟的事件鏈的方式。理想情況下,你須要在事件失敗的時候,可以從特定的步驟從新啓動,而不是徹底從頭來過。

最後還須要導出步驟,從而把數據放到更適合展示的系統中,可能還會作進一步的分析。例如從HDFS到RDBMS(關係型數據庫管理系統),在那裏你可使用更爲傳統的商業智能工具。

Spring XD想要提供一種統1、分佈式和可擴展的服務來知足這些用例。它沒有從頭開始,而是利用了大量已經存在的Spring技術。例如,它使用了Spring Batch來支持工做流調度和導出用例,使用Spring Integration來支持流處理,此外還使用了各類各樣的企業應用程序集成模式。其餘關鍵的Spring產品包括:使用Spring Data處理NoSQL/Hadoop工做,使用Reactor爲編寫異步程序提供簡化的API,特別是在使用LMAX Disruptor的時候。

安裝Spring XD

在接下來的部分,咱們會詳細看一下每一個用例。你可能想要本身來試驗一下這些例子。起步很是簡單。

爲了開始,你要確保系統至少安裝了Java JDK 6或者更新的版本。我推薦使用Java JDK 7。

對於OSX用戶,若是尚未Homebrew的話,請安裝,而後運行:

brew tap pivotal/tapbrew install springxd

這會安裝到 /usr/local/Cellar/springxd/1.0.0.M7/libexec (依賴於Spring XD的庫)。

注意:若是你隨後想要安裝更新的版本,那麼使用brew upgrade springXD就能夠。

紅帽或者CentOS的用戶可使用Yum來安裝。

Windows用戶能夠下載最新的.zip文件,解壓,安裝到文件夾,而後把XD_HOME這個環境變量設置成安裝文件夾。

你能夠經過鍵入如下命令,從而在單一節點上啓動Spring XD:

xd-singlenode

鍵入如下命令來打開另外一個終端窗口並啓動shell程序:

xd-shell

你會看到下面這樣的狀況:

爲了檢查它是否正常工做,讓咱們建立一個快速的流:

stream create --definition "time | log" --name ticktock --deploy

在你啓動Spring XD的控制檯中,你會看到下面這樣的顯示:

你能夠從shell中使用stream destroy命令刪除流。

stream destroy --name ticktock數據吸納流

在Spring XD中,基本的流會定義事件驅動數據的吸納,從源到目的地,通過任意多個處理器。

Spring XD外殼程序支持針對流定義的一種DSL,其中帶有管道和過濾器語法 - source | processor | sink。

例如,像這樣的命令 stream create --name filetest --definition "file | log" --deploy會記錄文件內容的日誌。
除了可以處理文件以外,Spring XD還支持不少其餘源,包括:

HTTP

命令 HTTP POST /streams/myStream "http | file --deploy" -表示「從HTTP消費個人流,並轉到文件」。這會默認到9000端口。你可使用--port選項覆蓋默認的端口設置。這是針對HTTP的惟一參數。

例如(從XD的外殼程序):

xd:> stream create --name infoqhttptest9010 --definition "http --port=9010 | file" --deploy

你能夠向這個新端口提交一些數據來測試:

xd:> http post --target http://localhost:9010 --data "hello world"

你會在控制檯窗口看到如下文本:

> POST (text/plain;Charset=UTF-8) http://localhost:9010 hello world > 200 OK

打開另外一個終端窗口並鍵入:

$ cd /tmp/xd/output $ tail -f infoqhttptest9010.out

你會在輸出中看到「hello world」。

想要發送二進制數據,你須要把Content-Type頭部說明設置爲application/octet-string:

$ curl --data-binary @foo.zip -H'Content-Type: application-octet-string' http://localhost:9000

鍵入 stream destroy infoqhttptest9010 來完成清理工做。

Mail

Mail是用來接收email的源模塊。根據所使用的協議,它能夠以池的形式工做,或者在可用的時候就接收email。

例如:

xd:> stream create --name infoqmailstream --definition "mail --host=imap.gmail.com --username=charles@c4media.com --password=secret --delete=false | file" --deploy

注意:這裏的delete選項很重要,由於對於Spring XD來講一旦被消費,默認狀況就會刪除電子郵件。Spring XD也擁有markAsRead選項,但默認值是false。Spring集成文檔中對此作出了詳細的說明,但主要問題是,POP3協議只知道在單獨一個會話中讀取了什麼。做爲POP3郵件適配器運行的結果,當郵件在每一個池中變成可用狀態時,就會被成功發送,且沒有任何一個郵件消息會被屢次發送。然而,當你重啓適配器並開始新的會話時,全部位於上一個會話中已經獲取過的郵件消息就可能會被再次獲取。

若是你在控制檯日誌中看到這樣的錯誤信息:

WARN task-scheduler-1 org.springframework.integration.mail.ImapIdleChannelAdapter:230 - error occurred in idle task javax.mail.AuthenticationFailedException: failed to connect, no password specified?

試着在你的URL把@符號替換爲URL編碼的樣子: %40:

stream create --name infoqmailstream --definition "mail --host=imap.gmail.com --username=charles%40c4media.com --password=secret --delete=false | file" --deploy

打開另外一個終端窗口並鍵入:

$ cd /tmp/xd/output $ tail -f infoqmailstream.out

給你本身發送一封郵件,以看到它在日誌文件中顯示的內容。

Twitter搜索

Spring XD就可使用Twitter搜索API(twittersearch),也可使用來自於Twitter's Streaming API的數據。

例如:

xd:> stream create --name twittersearchinfoq --definition "twittersearch --outputType=application/json --fixedDelay=1000 --consumerKey=afes2uqo6JAuFljdJFhqA --consumerSecret=0top8crpmd1MXGEbbgzAwVJSAODMcbeAbhwHXLnsg --query='infoq' | file" --deploy

它使用twittersearch的JSON輸出格式,每1000毫秒使用令牌「infoq」在Twitter中進行查詢。爲了運行上面的內容,你須要一個消費者密鑰(由Twitter發放的應用程序消費者密鑰)以及它相關的密鑰。


它的結果會經過管道以同步的方式傳輸給一個文件,默認是/tmp/xd/output/[streamName].out。

打開另外一個終端窗口並鍵入:

$ cd /tmp/xd/output $ tail -f twittersearchjava.out

稍等一下子,你會發現超出了Twitter APE搜索的限制,而且會在控制檯窗口中(你在其中在單一節點上啓動了XD)看到這樣的消息:

11:27:01,468 WARN task-scheduler-1 client.RestTemplate:581 - GET request for "https://api.twitter.com/1.1/search/tweets.json?q=infoq&count=20&since_id=478845525597237248" resulted in 429 (Client Error (429)); invoking error handler11:27:01,471 ERROR task-scheduler-1 handler.LoggingHandler:145 - org.springframework.social.RateLimitExceededException: The rate limit has been exceeded.

鍵入 stream destroy twittersearchinfoq 來完成清理工做。

其餘輸入流

GemFire:在XD容器進程中配置一個緩存(cache)和副本區域,它和Spring Integration GemFire同時存在於通道適配器中,它們由CacheListener支持,然後者會輸出區域中外部輸入事件所觸發的輸出消息。它還支持連續的查詢,那讓客戶端應用程序可使用對象查詢語言(OQL)來建立GemFire查詢,並註冊一個CQ監聽器,它會訂閱查詢,每次查詢的結果集發生變化的時候都會獲得通知。

Reactor IP:它會做爲服務器,讓遠程的組織可以鏈接到XD,並經過原生的TCP或者UDP socket提交數據。reactor-ip源和標準的tcp源的區別在於,它基於Reactor項目,能夠被配置爲使用LMAX Disruptor RingBuffer庫,它可以容許極高的吸納率,大概每秒1M。

Syslog:有三種syslog源:reactor-syslog、syslog-udp和syslog-tcp。reactor-syslog適配器使用tcp,會構建Reactor項目中可用的功能,並提供超過syslog-tcp適配器中更好的吞吐量。

TCP:它會做爲服務器,讓遠程的組織可以鏈接到XD,並經過原生的TCP socket提交數據。

MQTT:鏈接到MQTT服務器並接收遙測消息。

Taps

在流的任意位置,你均可以插入tap——這個詞來自於Gregor Hohpe等人著的《應用程序集成模式(Application Integration Patterns)》一書中的「wire tap」模式。

從概念上說,你會在通道中插入一個簡單的接收列表,它會把每一個進入的消息發佈到主通道和次通道中。流並不知道它的管道中任何tap的存在。刪除流並不會自動刪除tap——它們須要單獨刪除。然而,若是加入了tap的流被從新建立,那麼已經存在的tap會繼續起做用。

tap能夠在流的任意位置(或者多個位置)插入。

處理器

流中的數據能夠以多種方式處理:

過濾器:它能夠用於決定消息是否應該發送給輸出通道。最簡單的狀況是,過濾器只是一個SpEL布爾表達式,它會返回真或假。例如:

xd:> stream create --name filtertest --definition "http | filter --expression=payload=='good' | log" --deploy

會記錄帶有「good」關鍵字的全部內容的日誌。然而,過濾器也能夠至關複雜。Spring XD支持JSONPath計算式以及自定義的Groovy腳本。

轉換:用來轉換消息的內容或結構。它支持簡單的SpEL,對於更復雜的轉換,可使用Groovy腳本。

分割器:和Spring集成中的分割器概念相似,這裏的分割器會使用SpEL表達式,它會計算一個數組或者集合的值,從而把單獨一條消息切分紅多個獨立的消息。你可使用JSON oath表達式,但沒法使用自定義的Groovy腳本。

聚合器(Aggregator):和分割器相反,它會把多條消息組合成一條。

最後是腳本,能夠用於調用特定的Groovy腳本做爲處理步驟。

槽(Sinks)

最簡單的槽是日誌和文件。其餘能夠支持的槽包括Hadoop(HDFS)、JDBC、TCP、Mail、RabbitMQ、GemFire服務器、Splunk服務器和MQQT。還有一個動態路由選項,容許基於SpEL表達式或Groovy腳本的值,把Spring XD消息路由到命名通道中。讓我有一點奇怪的是,在這裏缺乏通常目的的JMS槽,儘管咱們能夠像[url=http://www.infoq.com/cn/articles/(https://github.com/spring-projects/spring-xd/wiki/Extending-XD]這裏[/url]描述的同樣構建自定義的槽模塊。

實時分析

Spring XD爲各類機器學習評分算法的實時計算提供了支持,還爲使用各類類型的計數器和計量器進行實時數據分析提供了支持。分析功能是經過能夠添加到流中的模塊實現的。在那種狀況下,實時分析是經過和數據吸納同樣的模塊完成的。

儘管流的主要角色能夠是執行實時分析,但更爲常見的是添加一個tap來初始化另外一個流,其中分析——例如:一個字段值的計數器——會應用給經過主要流吸納的一樣數據之上。

Spring XD中自帶提供了一些簡單的分析工具,它們都實現爲抽象API,針對內存數據庫和Redis而實現,以下:

  • 簡單計數器
  • 字段值計數器:計算特定字段出現的次數。
  • 聚合計數器: 在Mongo和Redis之類的工具中比較常見,讓你能夠對數據根據時間——例如分鐘、小時、月、年等——進行分片。
  • 計量器(Gauge):最新的值
  • 富計量器:最新的值,運行的平均值,最大、最小值

對於預測性的分析,Spring XD包含了一個可擴展的類庫,基於它能夠構建其餘實現。例如在GitHub上提供的PMML模塊,它和JPMML-Evaluator庫集成,爲更廣範圍內的模型類型提供了支持,而且能夠與從RRattleKNIMERapidMiner導出的模塊進行互操做。

產品還包含了一些抽象,能夠在流處理應用程序中事件分析模型。在撰寫這篇文章的時候,只支持預測性模塊標記語言(Predictive Model Markup Language,PMML),但Pivotal告訴InfoQ:

咱們正在進行一個內部項目,以提供普遍的分析解決方案,它的目標是圍繞「欺詐檢測」和「網絡安全」之類的狀況。咱們還在與OSS庫——像「stream-lib」和「graphlab」——的整合作了一些設計。

Pivotal還說明,他們指望,隨着時間的推移可以在這個領域看到發展,而且對預測性建模提供額外的支持。

批處理做業、工做流調度和導出

除了流以外,Spring XD還包含了基於Spring Batch啓動和監控批處理做業的功能,而Spring Batch也被用於支持工做流調度和導出用例。

工做流的概念會被轉換成批處理做業,那能夠被認爲是各個步驟的有向圖,每一個圖都是一個處理步驟:

根據配置的狀況,步驟能夠順序或者並行執行。它們能夠複製或者處理來自於文件、數據庫、MapReduce、Pig、Hive或Cascading做業的數據,而且和容許重啓的檢查點一塊兒持久化。和流同樣,做業支持單節點,或者能夠和數據分區一塊兒分佈。

Spring XD自身帶有少許預約義的做業,能夠用來向Hadoop文件系統HDFS導出數據,或者從中導入數據。這些做業覆蓋了FTP到HDFS、HDFS到JDBC、HDFS到MongoDB和JDBC到HDFS。還有一個做業用於向JDBC導出文件。你能夠在/libexec/xd/modules/job文件夾中找到。

Spring XD提供了至關基礎的、基於瀏覽器的圖形化界面,當前讓你能夠執行和任務相關的批處理做業。對於啓動Spring XD,管理員界面在這裏提供:

(點擊圖像能夠放大)

正如在上面的截屏中能夠看到的,管理員界面當前包括四個標籤頁:

  • 模塊:列舉了可用的批處理做業和更多細節(像做業模塊選項以及模塊的XML配置文件)。
  • 定義:列舉了XD批處理做業定義,並提供了部署或者卸載那些做業的動做。
  • 部署:列舉了全部部署了的做業,並提供了一種選項來啓動部署好的做業。一旦做業已經部署,它就能夠經過管理員界面啓動。
  • 執行:列舉了批處理做業的執行情況,並提供了一種選項,若是批處理做業能夠重啓,而且處於中止或者失敗狀態,那麼就重啓。
結論

Spring XD當前還處於開發中。第一個里程碑版本已經在2013年六月發佈,而GA版本指望在今年(2014年)七月發佈。它基於Apache第二版許可。在GitHub上提供了源代碼示例。你還能夠找到在線的Sonar代碼度量

產品可能還很新,但正如咱們看到的,它構建在成熟的基礎之上——Spring Batch、Spring Integration和Sping Data,以及Reactor項目、LMAX Disruptor和Apache Hadoop——並提供了一種輕量級的運行時環境,能夠經過DSL來配置和集成,只須要不多代碼,甚至不須要。Spring XD爲開發者提供了一種便利的方式,能夠開始構建大數據應用程序,爲構建和部署這樣的應用程序提供了「一站式服務」。

對於想要探索這個產品的讀者,有大量資源可用,包括主要的wiki,還有覆蓋了實時分析的視頻

關於做者

Charles Humble從2014年三月開始擔任InfoQ.com編輯團隊的主編,引領咱們的內容建立工做,包括新聞、文章、書籍、視頻和採訪。在全職加入InfoQ以前,Charles領導過咱們的Java部分工做,是PRPi顧問公司的CTO,該公司是一家簡歷研究公司,在2012年七月被PwC收購。他做爲開發者、架構師和開發經理在軟件企業中工做了近20年。在空閒時間,他會寫一些音樂,而且是倫敦周邊的技術小組Twofish的成員。

相關文章
相關標籤/搜索