Trident是在storm基礎上,一個以realtime 計算爲目標的高度抽象。 它在提供處理大吞吐量數據能力的同時,也提供了低延時分佈式查詢和有狀態流式處理的能力。 若是你對Pig和Cascading這種高階批量處理工具很瞭解的話,那麼你會很容易理解Trident的概念。Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此以外,Trident 還提供了一些專門的原語,從而在基於數據庫或者其餘存儲的前提下來應付有狀態的遞增式處理。Trident擁有一致的、exactly-once的語義,因此很容易去推導一個Trident Topology。html
讓咱們一塊兒來看一個關於Trident的例子。在這個例子中,主要作了兩件事情:java
爲了達到說明的目的,這個例子會從一個無限的輸入流中讀取語句做爲輸入:node
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true); |
這個spout會循環輸出語句集合到sentence stream當中,下面的代碼會以這個stream做爲輸入並計算每一個單詞的個數:mysql
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .parallelismHint(6); |
讓咱們一塊兒來讀一下這段代碼。咱們首先建立了一個TridentTopology對象,TridentTopology類暴露接口來構造Trident計算過程當中的全部內容。TridentTopology有一個newStream方法來建立一個數據流,以供topology讀取輸入源。這個例子中,輸入源是就是前面定義的FixedBatchSpout。輸入源也能夠是 Kestrel 或者Kafka的queue brokers. Trident會在Zookeeper中保存一小部分狀態信息來追蹤數據的處理狀況(關於數據是否已經被消費的元數據)。代碼中咱們指定的字符串「spout1」就是Zookeeper中用來存儲metadata信息的Znode節點git
Trident會把輸入流轉換成若干個tuple的batch來處理。例如,輸入的sentence stream可能會被拆分紅以下的batch:github
通常來講,這些小的batch中的tuple可能會在數千或者數百萬這樣的數量級,取決於輸入的吞吐量。sql
Trident提供了一系列很是成熟的批量處理的API來處理這些小batch. 這些API和你在Pig或者Cascading中看到的很是相似, 你能夠作group by's, joins, aggregations, 運行 functions, 執行 filters等等。固然,獨立的處理每一個小的batch並非很是有趣的事情,因此Trident提供了不少功能來實現batch之間的聚合的結果並能夠將這些聚合的結果存儲到內存,Memcached, Cassandra或者是一些其餘的存儲中。同時,Trident還提供了很是好的功能來查詢實時狀態。這些實時狀態能夠被Trident更新,同時它也能夠是一個獨立的狀態源。數據庫
回到咱們的這個例子中來,spout輸出了一個只有單一字段"sentence"的數據流。在下一行,topology使用了Split函數來拆分stream中的每個tuple,Split函數讀取輸入流中的"sentence"字段並將其拆分"word"。每個sentence tuple可能會被轉換成多個word tuple,好比說"the cow jumped over the moon" 會被轉換成6個 "word" tuples. 下面是Split的定義:apache
public class Split extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { collector.emit(new Values(word)); } } } |
如你所見,真的很簡單。它只是簡單的根據空格拆分sentence,並將拆分出的每一個word做爲一個tuple輸出。併發
topology的其餘部分計算單詞的個數並將計算結果保存到了持久存儲中。首先stream被根據"word"字段執行group操做,而後每個group使用Count聚合器進行持久化聚合。persistentAggregate會幫助你把一個狀態源聚合的結果存儲或者更新到存儲當中。在這個例子中,單詞的數量被保持在內存中,不過咱們能夠很簡單的把這些數據保存到其餘的存儲當中,如 Memcached, Cassandra等。若是咱們要把結果存儲到Memcached中,只是簡單的使用下面這句話替換掉persistentAggregate就能夠,這當中的"serverLocations"是Memcached cluster的主機和端口號列表:
.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count")) MemcachedState.transactional() |
persistentAggregate存儲的數據就是全部batch聚合的結果。
Trident很是酷的一點就是它是徹底容錯的,exactly-one處理語義。這就讓你能夠很輕鬆的使用Trident來進行實時數據處理。Trident會把狀態以某種形式保持起來,當有錯誤發生時,它會根據須要來恢復這些狀態。
persistentAggregate方法會把數據流轉換成一個TridentState對象。在這個例子當中,TridentState對象表明了全部的單詞的數量。咱們會使用這個TridentState對象來實如今計算過程當中的進行分佈式查詢。
下面這部分實現了一個低延時的單詞數量的分佈式查詢。這個查詢以一個用空格分割的單詞列表爲輸入,並返回這些單詞當天的個數。這些查詢是想普通的RPC調用那樣被執行的,要說不一樣的話,那就是他們在後臺是並行執行的。下面是執行查詢的一個例子:
DRPCClient client = new DRPCClient("drpc.server.location", 3772); System.out.println(client.execute("words", "cat dog the man"); // prints the JSON-encoded result, e.g.: "[[5078]]" |
如你所見,除了這是併發執行在storm cluster上以外,這看上去就是一個正常的RPC調用。這樣的簡單查詢的延時一般在10毫秒左右。固然,更復雜的DRPC調用可能會使用更長的時間,儘管延時很大程度上是取決於你給計算分配了多少資源。
這個分佈式查詢的實現以下所示:
topology.newDRPCStream("words") .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); |
咱們仍然是使用TridentTopology對象來建立DRPC stream,而且咱們將這個函數命名爲「words」。這個函數名會做爲第一個參數在使用DRPC Client來執行查詢的時候用到。
每一個DRPC請求會被當作只有一個tuple的batch job來處理。在處理過程當中,單一tuple來表示這個請求。這個tuple包含了一個叫作「args」的字段,在這個字段中保存了客戶端提供的查詢參數。在這個例子中,這個參數是一個以空格分割的單詞列表。
首先,咱們使用Splict功能把入參拆分紅獨立的單詞。而後對「word」 進行group by操做,以後就可使用stateQuery來在上面代碼中建立的TridentState對象上進行查詢。stateQuery接受一個數據源(在這個例子中,就是咱們的topolgoy所計算的單詞的個數)以及一個用於查詢的函數做爲輸入。在這個例子中,咱們使用了MapGet函數來獲取每一個單詞的出現個數。因爲DRPC stream是使用跟TridentState徹底一樣的group方式(按照「word」字段進行group),每一個單詞的查詢會被路由到TridentState對象管理和更新這個單詞的分區去執行。
接下來,咱們用FilterNull這個過濾器把從未出現過的單詞給去掉,並使用Sum這個聚合器將這些count累加起來。最終,Trident會自動把這個結果發送回等待的客戶端。
Trident在如何最大程度的保證執行topogloy性能方面是很是智能的。在topology中會自動的發生兩件很是有意思的事情:
讓咱們再來看一下Trident的另一個例子。
下一個例子是一個純粹的DRPC topology。這個topology會計算一個給定URL的reach。那麼什麼是reach呢,這裏咱們將reach定義爲有多少個獨立用戶在Twitter上面expose了一個給定的URL。要計算reach,你須要獲取曾經tweet過這個URL的全部人,而後找到全部follow這些人的人,並將這些follower去重,最後就獲得了去重後的follower的數量。若是把計算reach的整個過程都放在一臺機器上面,就太勉強了,由於這會須要進行數千次數據庫調用以及上億次的tuple的讀取。若是使用Storm和Trident,你就能夠把這些計算步驟在整個cluster中進行併發執行。
這個topology會讀取兩個state源。一個用來保存URL以及tweet這個URL的人的關係的數據庫。還有一個保持人和他的follower的關係數據庫。topology的定義以下:
TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState()); TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState()); topology.newDRPCStream("reach") .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")) .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) .shuffle() .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) .parallelismHint(200) .each(new Fields("followers"), new ExpandList(), new Fields("follower")) .groupBy(new Fields("follower")) .aggregate(new One(), new Fields("one")) .parallelismHint(20) .aggregate(new Count(), new Fields("reach")); |
這個topology使用newStaticState方法建立了TridentState對象來表明一種外部存儲。使用這個TridentState對象,咱們就能夠在這個topology上面進行動態查詢了。和全部的狀態源同樣,在數據庫上面的查詢會自動被批量執行,從而最大程度的提高效率。
這個topology的定義是很是直觀的 - 只是一個簡單的批量處理job。首先,查詢urlToTweeters數據庫來獲得tweet過這個URL的人員列表。這個查詢會返回一個列表,所以咱們使用ExpandList函數來把每個反悔的tweeter轉換成一個tuple。
接下來,咱們來獲取每一個tweeter的follower。咱們使用shuffle來把要處理的tweeter分佈到toplology運行的每個worker中併發去處理。而後查詢follower數據庫從而的到每一個tweeter的follower。你能夠看到咱們爲topology的這部分分配了很大的並行度,這是由於這部分是整個topology中最耗資源的計算部分。
而後咱們在follower上面使用group by操做進行分組,並對每一個組使用一個聚合器。這個聚合器只是簡單的針對每一個組輸出一個tuple 「One」,再count 「One」 從而的到不一樣的follower的數量。「One」聚合器的定義以下:
public class One implements CombinerAggregator<Integer> { public Integer init(TridentTuple tuple) { return 1; } public Integer combine(Integer val1, Integer val2) { return 1; } public Integer zero() { return 1; } } |
這是一個"彙總聚合器", 它會在傳送結果到其餘worker彙總以前進行局部彙總,從而來最大程度上提高性能。Sum也是一個彙總聚合器,所以以Sum做爲topology的最終操做是很是高效的。
接下來讓咱們一塊兒來看看Trident的一些細節。
Trident的數據模型就是TridentTuple - 一個有名的值的列表。在一個topology中,tuple是在一系列的處理操做(operation)中增量生成的。operation通常以一組子彈做爲輸入並輸出一組功能字段。Operation的輸入字段常常是輸入tuple的一個子集,而功能字段則是operation的輸出。
看一下以下這個例子。假定你有一個叫作「stream」的stream,它包含了「x」,"y"和"z"三個字段。爲了運行一個讀取「y」做爲輸入的過濾器MyFilter,你能夠這樣寫:
stream.each(new Fields("y"), new MyFilter()) |
假定MyFilter的實現是這樣的:
public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(0) < 10; } } |
這會保留全部「y」字段小於10的tuples。TridentTuple傳個MyFilter的輸入將只有字段「y」。這裏須要注意的是,當選擇輸入字段時,Trident會自動投影tuple的一個子集,這個操做是很是高效的。
讓咱們一塊兒看一下「功能字段」是怎樣工做的。假定你有以下這個功能:
public class AddAndMultiply extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { int i1 = tuple.getInteger(0); int i2 = tuple.getInteger(1); collector.emit(new Values(i1 + i2, i1 * i2)); } } |
這個函數接收兩個數做爲輸入並輸出兩個新的值:「和」和「乘積」。假定你有一個stream,其中包含「x」,"y"和"z"三個字段。你能夠這樣使用這個函數:
stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied")); |
輸出的功能字段被添加到輸入tuple中。所以這個時候,每一個tuple中將會有5個字段"x", "y", "z", "added", 和 "multiplied". "added" 和"multiplied"對應於AddAndMultiply輸出的第一和第二個字段。
另外,咱們可使用聚合器來用輸出字段來替換輸入tuple。若是你有一個stream包含字段"val1"和"val2",你能夠這樣作:
stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum")) |
output stream將會只包含一個叫作「sum」的字段,這個sum字段就是「val2」的累積和。
在group以後的stream上,輸出將會是被group的字段以及聚合器輸出的字段。舉例以下:
stream.groupBy(new Fields("val1")).aggregate(new Fields("val2"), new Sum(), new Fields("sum")) |
在這個例子中,輸出將包含字段"val1" 和 "sum".
在實時計算領域的一個主要問題就是怎麼樣來管理狀態並能輕鬆應對錯誤和重試。消除錯誤的影響是很是重要的,由於當一個節點死掉,或者一些其餘的問題出現時,那行batch須要被從新處理。問題是-你怎樣作狀態更新來保證每個消息被處理且只被處理一次?
這是一個很棘手的問題,咱們能夠用接下來的例子進一步說明。假定你在作一個你的stream的計數聚合,而且你想要存儲運行時的count到一個數據庫中去。若是你只是存儲這個count到數據庫中,而且想要進行一次更新,咱們是沒有辦法知道一樣的狀態是否是之前已經被update過了的。此次更新可能在以前就嘗試過,而且已經成功的更新到了數據庫中,不過在後續的步驟中失敗了。還有多是在上次更新數據庫的過程當中失敗的,這些你都不知道。
Trident經過作下面兩件事情來解決這個問題:
有了這2個原則,你就能夠達到有且只有一次更新的目標。你能夠將transaction id和count一塊兒以原子的方式存到數據庫中。當更新一個count的時候,須要判斷數據庫中當前batch的transaction id。若是跟要更新的transaction id同樣,就跳過此次更新。若是不一樣,就更新這個count。
固然,你不須要在topology中手動處理這些邏輯。這些邏輯已經被封裝在Stage的抽象中並自動進行。你的Stage object也不須要本身去實現transaction id的跟蹤操做。若是你想了解更多的關於如何實現一個Stage以及在容錯過程當中的一些取捨問題,能夠參照in this doc.
一個Stage能夠採用任何策略來存儲狀態。它能夠存儲到一個外部的數據庫,也能夠在內存中保持狀態並備份到HDFS中。Stage並不須要永久的保持狀態。好比說,你有一個內存版的Stage實現,它保存最近X個小時的數據並丟棄老的數據。能夠把Memcached integration 做爲例子來看看State的實現.
Trident的topology會被編譯成儘量高效的Storm topology。只有在須要對數據進行repartition的時候(如groupby或者shuffle)纔會把tuple經過network發送出去,若是你有一個trident以下:
它將會被編譯成以下的storm topology:
Trident使得實時計算更加優雅。你已經看到了如何使用Trident的API來完成大吞吐量的流式計算,狀態維護,低延時查詢等等功能。Trident讓你在獲取最大性能的同時,以更天然的一種方式進行實時計算。