Storm中有個特殊的Executor叫acker,他們負責跟蹤spout發出的每個Tuple的Tuple樹。當acker發現一個Tuple樹已經處理完成了,它會告訴框架回調Spout的ack(),不然回調Spout的fail()。html
Acker的跟蹤算法是Storm的主要突破之一,對任意大的一個Tuple樹,它只須要恆定的20字節就能夠進行跟蹤。java
咱們指望的是,若是某個Tuple被Bolt執行失敗了,則Spout端能夠從新發送該Tuple。但很遺憾的是,框架不會自動從新發送,須要咱們本身手工編碼實現。後續給你們實戰案例!算法
什麼是Tuple樹?apache
Spout類代碼以下:安全
package les19.Ack_Fail;session
import java.io.BufferedReader;框架
import java.io.FileInputStream;dom
import java.io.InputStreamReader;ide
import java.util.Map;函數
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class AckSpout implements IRichSpout{
/**
*
*/
private static final long serialVersionUID = 1L;
FileInputStream fis;
InputStreamReader isr;
BufferedReader br;
private ConcurrentHashMap<Object, Values> _pending;//線程安全的Map,存儲emit過的tuple
private ConcurrentHashMap<Object, Integer> fail_pending;//存儲失敗的tuple和其失敗次數
SpoutOutputCollector collector = null;
String str = null;
@Override
public void nextTuple() {
try {
while ((str = this.br.readLine()) != null) {
// 過濾動做
UUID msgId = UUID.randomUUID();
String arr[] = str.split("\t");
String date = arr[2].substring(0, 10);
String orderAmt = arr[1];
Values val = new Values(date,orderAmt);
this._pending.put(msgId, val);
collector.emit(val, msgId);
System.out.println("_pending.size()="+_pending.size());
}
} catch (Exception e) {
// TODO: handle exception
}
}
@Override
public void close() {
// TODO Auto-generated method stub
try {
br.close();
isr.close();
fis.close();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
@Override
//初始化函數
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.collector = collector;
this.fis = new FileInputStream("order.log");
this.isr = new InputStreamReader(fis, "UTF-8");
this.br = new BufferedReader(isr);
_pending = new ConcurrentHashMap<Object, Values>();
fail_pending = new ConcurrentHashMap<Object, Integer>();
} catch (Exception e) {
e.printStackTrace();
}
// TODO Auto-generated method stub
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("date","orderAmt"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
@Override
public void ack(Object msgId) {
// TODO Auto-generated method stub
System.out.println("_pending size 共有:"+_pending.size());
System.out.println("spout ack:"+msgId.toString()+"---"+msgId.getClass());
this._pending.remove(msgId);
System.out.println("_pending size 剩餘:"+_pending.size());
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
@Override
public void fail(Object msgId) {
// TODO Auto-generated method stub
System.out.println("spout fail:"+msgId.toString());
Integer fail_count = fail_pending.get(msgId);//獲取該Tuple失敗的次數
if (fail_count == null) {
fail_count = 0;
}
fail_count ++ ;
if (fail_count>=3) {
//重試次數已滿,再也不進行從新emit
fail_pending.remove(msgId);
}else {
//記錄該tuple失敗次數
fail_pending.put(msgId, fail_count);
//重發
this.collector.emit(this._pending.get(msgId), msgId);
}
}
}
Bolt以下:
package les19.Ack_Fail;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class AckBolt implements IRichBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
OutputCollector collector = null;
TopologyContext context = null;
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
int num = 0;
String url = null;
String session_id = null;
String date = null;
String province_id = null;
@Override
public void execute(Tuple input) {
try {
date = input.getStringByField("date") ;
Double orderAmt = Double.parseDouble(input.getStringByField("orderAmt"));
collector.emit(input,new Values(date,orderAmt));//注意參數,第一個參數是Tuple自己
collector.ack(input);
// Thread.sleep(300);
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
}
}
//初始化,對應spout的open函數
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method
this.context = context ;
this.collector = collector ;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("date","orderAmt")) ;
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
TOPO類以下:
package les19.Ack_Fail;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
public class Ack_FailTopo {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new AckSpout(), 1);
builder.setBolt("bolt", new AckBolt(), 1).shuffleGrouping("spout");
Config conf = new Config() ;
//conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
conf.setDebug(false);
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder.createTopology());
}
}
}
想了解更多,見個人51CTO上的Storm視頻教程http://edu.51cto.com/course/course_id-9041.html
,本節來自第18-19講。