Storm入門(2)--Storm編程

以電信通話記錄爲例java

移動呼叫及其持續時間將做爲對Apache Storm的輸入,Storm將處理和分組在相同呼叫者和接收者之間的呼叫及其呼叫總數。linux

 Storm編程套路apache

在storm中,把對數據的處理過程抽象成一個topology,這個topology包含的組件主要是spout、bolt,以及以tuple形式在組件之間傳輸的數據流。這個數據流在topology流一遍,就是對數據的一次處理。編程

一、建立Spout類dom

這一部分,是建立數據流的源頭。ide

建立一個類,實現IRichSpout接口,實現相應方法。其中幾個方法的含義:ui

  • open -爲Spout提供執行環境。執行器將運行此方法來初始化噴頭。通常寫一些第一次運行時要處理的邏輯
  • nextTuple -經過收集器發出生成的數據。核心,用於生成數據流
  • close -當spout將要關閉時調用此方法。
  • declareOutputFields -聲明元組的輸出模式。即,聲明瞭今後spout出去的流都的數據格式
  • ack -確認處理了特定元組。
  • fail -指定不處理和不從新處理特定元組。
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - 爲此spout提供storm配置。
  • context - 提供有關拓撲中的spout位置,其任務ID,輸入和輸出信息的完整信息。
  • collector - 使咱們可以發出將由bolts處理的元組。
nextTuple()

nextTuple()從與ack()和fail()方法相同的循環中按期調用。它必須釋放線程的控制,當沒有工做要作,以便其餘方法有機會被調用。所以,nextTuple的第一行檢查處理是否已完成。若是是這樣,它應該休眠至少一毫秒,以減小處理器在返回以前的負載。this

declareOutputFields(OutputFieldsDeclarer declarer)

declarer -它用於聲明輸出流id,輸出字段等,此方法用於指定元組的輸出模式。spa

ack(Object msgId)

該方法確認已經處理了特定元組。線程

fail(Object o)

此方法通知特定元組還沒有徹底處理。 Storm將從新處理特定的元組

package com.jing.calllogdemo;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

/*
spout類,負責產生數據流
 */
public class CallLogSpout implements IRichSpout {
    //spout 輸出收集器
    private SpoutOutputCollector collector;
    //是否完成
    private boolean completed = false;
    //上下文對象
    private TopologyContext context;
    //隨機發生器
    private Random randomGenerator = new Random();
    //索引
    private Integer idx = 0;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        //第一次運行要作的事
        this.context = topologyContext;
        this.collector = spoutOutputCollector;

    }

    @Override
    public void close() {

    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }

    @Override
    public void nextTuple() {
        //產生第一條數據,

        if (this.idx <= 1000){
            List<String> mobileNumbers = new ArrayList<String>();
            mobileNumbers.add("1234123401");
            mobileNumbers.add("1234123402");
            mobileNumbers.add("1234123403");
            mobileNumbers.add("1234123404");

            Integer localIdx = 0;
            while (localIdx++ < 100 && this.idx++ <1000){
                //取出主叫
                String caller = mobileNumbers.get(randomGenerator.nextInt(4));
                //取出被叫
                String callee = mobileNumbers.get(randomGenerator.nextInt(4));
                while (caller == callee){
                    //從新取出被叫
                    callee = mobileNumbers.get(randomGenerator.nextInt(4));
                }
                //模擬通話時長
                Integer duration = randomGenerator.nextInt(60);
                //輸出元祖
                this.collector.emit(new Values(caller,callee,duration));
            }
        }

    }

    @Override
    public void ack(Object o) {

    }

    @Override
    public void fail(Object o) {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //聲明輸出字段,定義元組的結構,定義輸出字段名稱
        outputFieldsDeclarer.declare(new Fields("from", "to", "duration"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
CallLogSpout

二、建立Bolt類

這一部分是完成對數據流的處理,Bolt把元組做爲輸入,對元組進行處理後,產生新的元組,bolt類能夠有多個。

建立一個類,實現IRichBolt接口,實現相應方法。

  • prepare -爲bolt提供要執行的環境。執行器將運行此方法來初始化spout。
  • execute -處理單個元組的輸入
  • cleanup -當spout要關閉時調用。
  • declareOutputFields -聲明元組的輸出模式。
prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf -爲此bolt提供Storm配置。
  • context -提供有關拓撲中的bolt位置,其任務ID,輸入和輸出信息等的完整信息。
  • collector -使咱們可以發出處理的元組。
execute(Tuple tuple)

這是bolt的核心方法,這裏的元組是要處理的輸入元組。execute方法一次處理單個元組。元組數據能夠經過Tuple類的getValue方法訪問。沒必要當即處理輸入元組。多元組能夠被處理和輸出爲單個輸出元組。處理的元組能夠經過使用OutputCollector類發出。

cleanup()
declareOutputFields(OutputFieldsDeclarer declarer)

這個方法用於指定元組的輸出模式,參數declarer用於聲明輸出流id,輸出字段等。

這裏有兩個bolt

呼叫日誌建立者bolt接收呼叫日誌元組。呼叫日誌元組具備主叫方號碼,接收方號碼和呼叫持續時間。此bolt經過組合主叫方號碼和接收方號碼簡單地建立一個新值。新值的格式爲「來電號碼 - 接收方號碼」,並將其命名爲新字段「呼叫」

package com.jing.calllogdemo;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;
/*
建立calllog日誌的bolt
 */
public class CallLogCreatorBolt implements IRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        //處理新的同話記錄
        String from = tuple.getString(0);
        String to = tuple.getString(1);
        Integer duration = tuple.getInteger(2);
        //產生新的tuple
        String fromTO = from + "-" + to;
        collector.emit(new Values(fromTO, duration));

    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //設置輸出字段的名稱
        outputFieldsDeclarer.declare(new Fields("call", "duration"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
CallLogCreatorBolt

 呼叫日誌建立者bolt接收呼叫日誌元組。呼叫日誌元組具備主叫方號碼,接收方號碼和呼叫持續時間。此bolt經過組合主叫方號碼和接收方號碼簡單地建立一個新值。新值的格式爲「來電號碼 - 接收方號碼」,並將其命名爲新字段「呼叫」。

package com.jing.calllogdemo;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;
/*
通話記錄計數器bolt
 */
public class CallLogCounterBolt implements IRichBolt {
    Map<String, Integer> counterMap;
    private OutputCollector collector;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.counterMap = new HashMap<String, Integer>();
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String call = tuple.getString(0);
        Integer duration = tuple.getInteger(1);
        if(!counterMap.containsKey(call)){
            counterMap.put(call, 1);
        }else {
            Integer c = counterMap.get(call) + duration;
            counterMap.put(call, c);
        }
        collector.ack(tuple);

    }

    @Override
    public void cleanup() {
        for(Map.Entry<String, Integer> entry : counterMap.entrySet()){
            System.out.println(entry.getKey() + " : " + entry.getValue());
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("call"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
CallLogCounterBolt

 三、建立執行入口類,構建Topology

Storm拓撲基本上是一個Thrift結構。 TopologyBuilder類提供了簡單而容易的方法來建立複雜的拓撲。TopologyBuilder類具備設置spout (setSpout)和設置bolt (setBolt)的方法。最後,TopologyBuilder有createTopology來建立拓撲。使用如下代碼片斷建立拓撲 -
package com.jing.calllogdemo;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class App {
    public static void main(String[] args) throws InterruptedException, InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();

        //設置spout
        builder.setSpout("spout", new CallLogSpout());
        //設置creator-bolt
        builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");
        //設置countor-bolt
        builder.setBolt("counter-bolt", new CallLogCounterBolt()).
                fieldsGrouping("creator-bolt", new Fields("call"));

        Config config = new Config();
        config.setDebug(true);

        /*本地模式
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();

         */

        StormSubmitter.submitTopology("myTop", config, builder.createTopology());


    }
}
App

 

 

爲了開發目的,咱們可使用「LocalCluster」對象建立本地集羣,而後使用「LocalCluster」類的「submitTopology」方法提交拓撲。 「submitTopology」的參數之一是「Config」類的實例。「Config」類用於在提交拓撲以前設置配置選項。此配置選項將在運行時與集羣配置合併,並使用prepare方法發送到全部任務(spout和bolt)。一旦拓撲提交到集羣,咱們將等待10秒鐘,集羣計算提交的拓撲,而後使用「LocalCluster」的「shutdown」方法關閉集羣。完整的程序代碼以下 -
 
 
參考:

做者:raincoffee
連接:https://www.jianshu.com/p/7af9693d9ffc
來源:簡書
簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。

 

生產環境的集羣上運行topology

1)修改提交方式,在代碼中

2)導出jar包 mvn

3)在linux上運行topologys

&>storm jar XXX.jar  full.class.name

相關文章
相關標籤/搜索