storm坑之---同步問題

  最近在作一個監控系統,用來監控網站上各個業務功能的調用量以及處理時間,以便及時發現問題,及時處理。作這種實時統計處理系統,天然首先想到了storm,因而現學現用,天然遇到了一些坑,並且很多是網上也難以找到的問題。在這裏就作個記錄,記錄下這個最讓我苦惱的錯誤。java

  首先個人業務邏輯是按分鐘統計一分鐘中的調用次數的數據,因此我在bolt裏跑了一個定時器,定時將統計數據發到下一個bolt入庫。所在我在定時器執行的代碼裏調用了OutputCollector發射到下一個bolt。本地調試沒啥問題,就部署到外網環境測試。一般也沒發現問題,可是偶爾會出現這種錯誤,做爲開發人員最討厭的就是這種可復現率很低的錯誤 。網絡

  這裏是錯誤日誌:dom

5675 [Thread-7-disruptor-executor[2 2]-send-queue] ERROR backtype.storm.daemon.executor - 
java.lang.RuntimeException: java.lang.NullPointerException
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
	at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
	at java.lang.Thread.run(Thread.java:722) [na:1.7.0_15]
Caused by: java.lang.NullPointerException: null
	at clojure.lang.RT.intCast(RT.java:1087) ~[clojure-1.5.1.jar:na]
	at backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3]
	... 6 common frames omitted
	
	
5697 [Thread-7-disruptor-executor[2 2]-send-queue] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
	at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
	at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
	at backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452) [storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
	at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
	at java.lang.Thread.run(Thread.java:722) [na:1.7.0_15]

  若是你也遇到這個問題,相信你第一次看到這個錯誤必定很痛苦,由於錯誤日誌中沒有任何與本身的業務代碼相關的記錄。因此實在是無從定位問題的所在。痛苦至極的是復現還不那麼容易。async

  通過我屢次猜想嘗試,終於測出了問題的所在。下面我先貼出一個會報這個錯誤的例子代碼:ide

public class Main {

	public static void main(String[] args) {
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("spout",new TestWordSpout());
		
		builder.setBolt("dispatch", new WordDispatchBolt()).shuffleGrouping("spout");
		builder.setBolt("print",new PrintBolt()).fieldsGrouping("dispatch", new Fields("word"));
        
		Config conf = new Config();
		
		conf.setDebug(false);
		conf.setNumWorkers(1);
		//conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("test-kafka-1", conf, builder.createTopology());
	}

}

  

public class TestWordSpout extends BaseRichSpout {
   
  private static final long serialVersionUID = 1L;
    boolean _isDistributed;
    SpoutOutputCollector _collector;
    String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    public TestWordSpout() {
        this(true);
    }

    public TestWordSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }
        
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }
    
    public void close() {
        
    }
        
    public void nextTuple() {
        Utils.sleep(1000);
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word), word+new Random().nextDouble());
    }
    
    public void ack(Object msgId) {
    	System.out.println("### ack:"+msgId);
    }

    public void fail(Object msgId) {
        System.out.println("### fail:"+msgId);
    }
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

  

public class WordDispatchBolt extends BaseRichBolt{

	private OutputCollector collector;
	
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
		
		new Thread(new Runnable() {
			
			@Override
			public void run() {
				while(true){
					send();//不作sleep休眠,不然拋出此異常的概率過小,不容易觀察到
				}
			}
		}).start();
	}

	public void send(){
		this.collector.emit(new Values(new Random().nextDouble()));
	}
	@Override
	public void execute(Tuple input) {
		String word = input.getStringByField("word");
		this.collector.emit(new Values(word));
		this.collector.ack(input);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
	
}

  

public class PrintBolt extends BaseRichBolt {

	private static final long serialVersionUID = 1L;

	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
	}

	@Override
	public void execute(Tuple input) {
		System.out.println(input.getValue(0));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
	}


}

  這個代碼很簡單,就不作詳細介紹了。在WordDispatchBolt類裏我啓動了另外一個線程來發射數據到下一個bolt。個人業務代碼中與此相似,是經過Timer定時發送數據的(Timer底層其實也是線程,就很少說了)。可是Timer是按分鐘調用的,因此出現問題的概率小的可憐,這裏我故意零停頓的調用,讓此異常發生的概率更大一些。 oop

  若是運行以上例子代碼,你也確定遇到前邊貼出的錯誤異常。若是不知道是OutputCollector的同步問題,相信解決起來絕對讓人痛不欲生。既然知道了是同步問題,要麼避免在別的線程裏調用collector,要麼改爲同步的。如下是我簡單想到的解決方案。(若是有大神還有更好的,但願留言指教)測試

  對WordDispatchBolt類作以下修改:網站

public class WordDispatchBolt extends BaseRichBolt{

	private OutputCollector collector;
	
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
		
		new Thread(new Runnable() {
			
			@Override
			public void run() {
				while(true){
					send(new Values(new Random().nextDouble()));//不作sleep休眠,不然拋出此異常的概率過小,不容易觀察到
				}
			}
		}).start();
	}

	public synchronized void send(List<Object> tuple){
		this.collector.emit(tuple);
	}
	@Override
	public void execute(Tuple input) {
		String word = input.getStringByField("word");
		send(new Values(word));
		this.collector.ack(input);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
	
}

 

  到這裏,這個坑算是基本獲得解決了。以後可能還要大量使用到storm,遇到坑是再作記錄。ui

 

  」把遇到的坑記錄下來,讓後遇到者能夠有更多的網絡資源查詢,以減小排查問題的時間和糾結「this

相關文章
相關標籤/搜索