假設您正在對流進行計數聚合,並但願將運行計數存儲在數據庫中。如今假設您在數據庫中存儲了一個表示計數的值,而且每次處理新元組時都會增長計數。php
發生故障時,將重發送元組。這會在執行狀態更新(或任何帶有反作用的事物)時出現問題 - 您不知道之前是否曾基於此元組成功更新狀態。也許你之前從未處理過元組,在這種狀況下你應該增長計數。也許你已經處理了元組併成功遞增了計數,可是元組在另外一個步驟中處理失敗。在這種狀況下,您不該增長計數。或許您以前看過元組但在更新數據庫時出錯。在這種狀況下,您應該更新數據庫。css
只需將計數存儲在數據庫中,您就不知道以前是否已經處理過這個元組。所以,您須要更多信息才能作出正確的決定。Trident提供如下語義,足以實現一次性處理語義:html
使用這些原語,您的State實現能夠檢測以前是否已經處理了一批元組,並採起適當的操做以一致的方式更新狀態。您採起的操做取決於輸入splot提供的確切語義,即每批中的內容。在容錯方面有三種可能的splot:「非事務性」,「事務性」和「不透明事務性」。一樣,在容錯方面有三種可能的狀態:「非事務性」,「事務性」和「不透明事務性」。讓咱們來看看每一個splot類型,看看每種噴口能夠達到什麼樣的容錯能力。java
請記住,Trident將元組做爲小批量處理,每一個批次都被賦予惟一的事務ID。spout的屬性根據它們能夠提供的關於每批中的含量的保證而變化。事務性spout具備如下屬性:git
爲何不老是使用事務性spout? 它們簡單易懂。您可能不使用它的一個緣由是由於它們不必定很是容錯。例如,TransactionalTridentKafkaSpout的工做方式是txid的批處理將包含來自主題的全部Kafka分區的元組。一旦批次被髮出,那麼在未來從新發出批次的任什麼時候候都必須發出徹底相同的元組集合以知足事務性噴口的語義。如今假設從TransactionalTridentKafkaSpout發出批處理,批處理沒法處理,同時其中一個Kafka節點發生故障。您如今沒法重播與以前相同的批次(由於節點已關閉且主題的某些分區不可用),github
這就是存在「不透明事務」spout的緣由 - 它們對丟失源節點具備容錯能力,同時仍容許您實現一次性處理語義。數據庫
(一方面注意 - 一旦Kafka支持複製,就有可能擁有對節點故障具備容錯能力的事務性spout,但該功能尚不存在。)apache
假設您的拓撲計算字數,而且您但願將字數存儲在鍵/值數據庫中。鍵將是單詞,值將包含計數。您已經看到只存儲計數,由於該值不足以知道您以前是否處理過一批元組。相反,您能夠作的是將事務id與數據庫中的count一塊兒存儲爲原子值。而後,在更新計數時,您只需將數據庫中的事務ID與當前批次的事務ID進行比較。若是它們是相同的,則跳過更新 - 因爲強大的排序,您肯定數據庫中的值包含當前批次。若是它們不一樣,則增長計數。這個邏輯有效,由於txid的批處理永遠不會改變,bash
假設您正在處理由如下一批元組組成的txid 3:併發
["man"]
["man"]
["dog"]
複製代碼
假設數據庫當前包含如下鍵/值對:
man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
複製代碼
與「man」關聯的txid爲txid 1.因爲當前txid爲3,所以您肯定該批次中未表示此批元組。所以,您能夠繼續將計數增長2並更新txid。另外一方面,「dog」的txid與當前的txid相同。所以,您肯定已知當前批次的增量已在數據庫中表示爲「dog」鍵。因此你能夠跳過更新。完成更新後,數據庫以下所示:
man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
複製代碼
模糊事務型spout不能保證txid的一批元組保持不變。不透明的事務性spout具備如下屬性:
非事務型 spout不對每批中的物品提供任何保證。所以它可能最多隻進行一次處理,在這種狀況下,在批次失敗後不會重試元組。或者它可能具備至少一次處理,其中元組能夠經過多個批次成功處理。對於這種spout,沒有辦法實現徹底一次的語義。
模糊事務型 state 具備最好的容錯性特徵,不過這是以在數據庫中存儲更多的內容爲代價的(一個 txid 和兩個 value)。事務型 state 要求的存儲空間相對較小,可是它的缺點是隻對事務型 spout 有效。相對的,非事務型要求的存儲空間最少,可是它也不能提供任何的剛好一次的消息執行語義。
你選擇 state 與 spout 的時候必須在容錯性與存儲空間佔用之間權衡。能夠根據你的應用的需求來肯定哪一種組合最適合你。
簡單輸出數據
public class TridentTopology1 {
/**
* 接受一組輸入字段併發出零個或多個元組做爲輸出 (相似storm bolt數據流處理組件)
* @author qxw
* @data 2018年9月19日下午6:17:14
*/
public static class MyFunction extends BaseFunction {
private static final long serialVersionUID = 1L;
public void execute(TridentTuple tuple, TridentCollector collector) {
System.out.println("a: "+tuple.getIntegerByField("a"));
System.out.println("b: "+tuple.getIntegerByField("b"));
System.out.println("c: "+tuple.getIntegerByField("c"));
System.out.println("d: "+tuple.getIntegerByField("d"));
}
}
@SuppressWarnings("unchecked")
public static void main(String[] args) {
//固定批處理數據源(相似storm原生的spout) 聲明2個輸入的字段
FixedBatchSpout spout =new FixedBatchSpout(new Fields("a","b","c","d"),4,//設置批處理大小
new Values(1,4,7,10),
new Values(2,3,5,7),
new Values(6,9,7,2),
new Values(9,1,6,8) //設置數據內容
);
//是否循環發送
spout.setCycle(false);
//建立topology
TridentTopology topology =new TridentTopology();
//指定數據源
Stream input=topology.newStream("spout", spout);
//要實現storm原生spolt--bolt的模式在Trident中用each實現
input.each(new Fields("a","b","c","d"),
new MyFunction(),//執行函數 相似bolt
new Fields() //爲空不向下發送
);
Config conf = new Config();
conf.setNumWorkers(1);
conf.setMaxSpoutPending(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TridentTopology1", conf, topology.build());
}
}
複製代碼
經過要繼承BaseFilter,重寫isKeep方法
public class TridentTopology2 {
/**
* 能夠海量數據進行過濾,須要繼承BaseFilter,重寫isKeep方法
* @author qxw
* @data 2018年9月21日上午10:57:00
*/
public static class MyFilter extends BaseFilter {
private static final long serialVersionUID = 1L;
public boolean isKeep(TridentTuple tuple) {
//可以被2對第1個和第2個值進行相加.而後除2,爲0則發射,不爲零則不發射射
return tuple.getInteger(1) % 2 == 0;
}
}
/**
* 相似原生storm bolt數據流處理組件
* @author qxw
* @data 2018年9月21日下午3:31:12
*/
public static class MyFunction extends BaseFunction{
private static final long serialVersionUID = 1L;
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//獲取tuple輸入內容
Integer a = tuple.getIntegerByField("a");
Integer b = tuple.getIntegerByField("b");
Integer c = tuple.getIntegerByField("c");
Integer d = tuple.getIntegerByField("d");
System.out.println("a: "+ a + ", b: " + b + ", c: " + c + ", d: " + d);
}
}
@SuppressWarnings("unchecked")
public static void main(String[] args) {
//固定批處理數據源(相似storm原生的spout) 聲明a,b,c,d四個字段
FixedBatchSpout spout =new FixedBatchSpout(new Fields("a","b","c","d"),4,//設置批處理大小
new Values(1,4,7,10),
new Values(2,3,5,7),
new Values(6,9,7,2),
new Values(9,1,6,8) //設置數據內容
);
//是否循環發送
spout.setCycle(false);
//建立topology
TridentTopology topology =new TridentTopology();
//指定數據源
Stream input=topology.newStream("spout", spout);
//要實現storm原生spolt--bolt的模式在Trident中用each實現 (隨機分組)
input.shuffle().each(new Fields("a","b","c","d"),new MyFilter()).each(new Fields("a","b","c","d"), new MyFunction(),new Fields());
//本地模式
Config conf = new Config();
conf.setNumWorkers(1);
conf.setMaxSpoutPending(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TridentTopology2", conf, topology.build());
//集羣模式
// StormSubmitter.submitTopology("TridentTopology1", conf, buildTopology());
}
複製代碼
public class TridentWordCount {
public static class MyFunction extends BaseFunction {
private static final long serialVersionUID = 1L;
public void execute(TridentTuple tuple, TridentCollector collector) {
String word=tuple.getStringByField("word");
Long count=tuple.getLongByField("count");
System.out.println(word+" : "+count);
}
}
@SuppressWarnings("unchecked")
public static void main(String[] args) {
/* 建立spout */
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 4,
new Values("java php asd java"),
new Values("php css js html"),
new Values("js php java java"),
new Values("a a b c d"));
//是否循環發送
spout.setCycle(false);
/* 建立topology */
TridentTopology topology = new TridentTopology();
/* 建立Stream spout1, 分詞、統計 */
topology.newStream("spout", spout)
//先切割
.each(new Fields("sentence"), new Split(), new Fields("word"))
//分組
.groupBy(new Fields("word"))
//聚合統計
.aggregate(new Count(), new Fields("count"))
//輸出函數
.each(new Fields("word","count"), new MyFunction(),new Fields())
//設置並行度
.parallelismHint(1);
Config conf = new Config();
conf.setNumWorkers(1);
conf.setMaxSpoutPending(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TridentWordCount", conf, topology.build());
}
}
複製代碼
public class TridentDrpc {
private static class MyFunction extends BaseFunction{
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
String sentence = tridentTuple.getString(0);
for (String word : sentence.split(" ")) {
tridentCollector.emit(new Values(word));
}
}
}
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
TridentTopology topology=new TridentTopology();
Config conf = new Config();
conf.setMaxSpoutPending(20);
//本地模式
if (args.length==0){
LocalCluster cluster = new LocalCluster();
LocalDRPC drpc = new LocalDRPC();
Stream input=topology.newDRPCStream("data",drpc);
input.each(new Fields("args"),new MyFunction(),new Fields("result")).project(new Fields("result"));
cluster.submitTopology("wordCount", conf, topology.build());
//調用
System.err.println("DRPC RESULT: " + drpc.execute("data", "cat the dog jumped"));
drpc.shutdown();
cluster.shutdown();
}else{
//集羣模式
conf.setNumWorkers(2);
StormSubmitter.submitTopology(args[0], conf, topology.build());
}
}
}
複製代碼