Strom-(2)核心應用開發

  1. Storm中,SpoutBolt都是ComponentStorm定義了一個名叫IComponent的總接口java

ØSpout的最頂層抽象是ISpout接口。數據庫

一般狀況下(Shell和事務型的除外),實現一個Spout,能夠直接實現接口IRichSpout併發

若是不想寫多餘的代碼,能夠直接繼承BaseRichSpoutdom

ØBolt的最頂層抽象是IBolt接口分佈式

Storm提供了IBasicBolt接口,其目的就是實現該接口的Bolt不用在代碼中提供反饋結果了,Storm內部會自動反饋成功ide


主要開發流程:工具

1、實現Spout接口IRichSpout或者繼承類BaseRichSpoutui

2、實現Bolt接口IRichBolt或者IBasicBolt或者繼承類BaseBasicBoutthis

3、使用TopologyBuilder定義Topology(setSpout/ setBolt/ Groupingspa

      一個SpoutBolt的併發執行單元數和併發任務數在此設置)

4、配置TOPOLOGY_WORKERS等參數;

5、利用TopologyBuilder方法createTopology建立拓撲;

6、提交拓撲 StormSubmitter.submitTopology

Topology

/**
 * 定義了一個簡單的topology,包括一個數據噴發節點spout和一個數據處理節點bolt。
 */
public class SimpleTopology
{

	public static void main(String[] args)
	{
		try
		{
			// 實例化TopologyBuilder類。
			TopologyBuilder topologyBuilder = new TopologyBuilder();
			// 設置噴發節點並分配併發數,該併發數將會控制該對象在集羣中的線程數。
			topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1);
			// 設置數據處理節點並分配併發數。指定該節點接收噴發節點的策略爲隨機方式。
			topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping(
					"SimpleSpout");
			Config config = new Config();
			config.setDebug(true);
			if (args != null && args.length > 0)
			{
				config.setNumWorkers(1);
				StormSubmitter.submitTopology(args[0], config,
						topologyBuilder.createTopology());
			}
			else
			{
				// 這裏是本地模式下運行的啓動代碼。
				config.setMaxTaskParallelism(1);
				LocalCluster cluster = new LocalCluster();
				cluster.submitTopology
			}
		}
	}
}

spout

/**
 * Spout起到和外界溝通的做用,他能夠從一個數據庫中按照某種規則取數據,也能夠從分佈式隊列中取任務
 */
@SuppressWarnings("serial")
public class SimpleSpout extends BaseRichSpout
{

	// 用來發射數據的工具類
	private SpoutOutputCollector collector;
	private static String[] info = new String[] {
			"comaple\t,12424,44w46,654,12424,44w46,654,",
			"lisi\t,435435,6537,12424,44w46,654,",
			"lipeng\t,45735,6757,12424,44w46,654,",
			"hujintao\t,45735,6757,12424,44w46,654,",
			"jiangmin\t,23545,6457,2455,7576,qr44453",
			"beijing\t,435435,6537,12424,44w46,654,",
			"xiaoming\t,46654,8579,w3675,85877,077998,",
			"xiaozhang\t,9789,788,97978,656,345235,09889,",
			"ceo\t,46654,8579,w3675,85877,077998,",
			"cto\t,46654,8579,w3675,85877,077998,",
			"zhansan\t,46654,8579,w3675,85877,077998," };
	Random random = new Random();

	/**
	 * 初始化collector
	 */
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
	{
		this.collector = collector;
	}

	/**
	 * 在SpoutTracker類中被調用,每調用一次就能夠向storm集羣中發射一條數據(一個tuple元組),該方法會被不停的調用
	 */
	@Override
	public void nextTuple()
	{
		try
		{
			String msg = info[random.nextInt(11)];
			// 調用發射方法
			collector.emit(new Values(msg));
			// 模擬等待100ms
			Thread.sleep(100);
		}
		catch (InterruptedException e)
		{
			e.printStackTrace();
		}
	}

	/**
	 * 定義字段id,該id在簡單模式下沒有用處,但在按照字段分組的模式下有很大的用處。
	 * 該declarer變量有很大做用,咱們還能夠調用declarer.declareStream();來定義stramId,該id能夠用來定義更加複雜的流拓撲結構
	 */
	@Override
	public void declareOutputFields(OutputFieldsD

bolt

package com.zmq.helloword;

/**
 * @author zhangmq(工號:68598) Tel:☎
 * @version 1.0
 * @since 2014-8-26 下午2:46:53
 * @category com.ljq.helloword
 * @copyright 南京聯創科技 網管科技部
 */
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 接收噴發節點(Spout)發送的數據進行簡單的處理後,發射出去。
 */
@SuppressWarnings("serial")
public class SimpleBolt extends BaseBasicBolt
{

	public void execute(Tuple input, BasicOutputCollector collector)
	{
		try
		{
			String msg = input.getString(0);
			if (msg != null)
			{
				// System.out.println("msg="+msg);
				collector.emit(new Values(msg + "msg is processed!"));
			}
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
	}
相關文章
相關標籤/搜索