JStorm與Storm源碼分析(五)--SpoutOutputCollector與代理模式

本文主要是解析SpoutOutputCollector源碼,順便分析該類中所涉及的設計模式–代理模式。 
首先介紹一下Spout輸出收集器接口–ISpoutOutputCollector,該接口主要聲明瞭如下3個抽象方法用來約束ISpoutOutputCollector的實現類。接口定義與方法說明以下:java

/**
 * ISpoutOutputCollector:Spout輸出收集器接口
 */
public interface ISpoutOutputCollector {
    /**
     * 改方法用來向外發送數據,它的返回值是該消息全部發送目標的taskID集合;
     * 參數:
     * streamId:消息Tuple將要被輸出到的流
     * tuple:要輸出的消息,是一個Object列表
     * messageId:輸出消息的標記信息,若是messageId被設置爲null,則Storm不會追蹤該消息,
     * 不然它會被用來追蹤所發出的消息處理狀況
     */
    List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
    /**
     * 該方法與上面emit方法相似,區別在於:
     * 1.數據(消息)只由所指定taskId的Task接收;(這就意味着若是沒有下游節點接收該消息,則該消息就沒有被真正發送)
     * 2.該方法要求參數streamId所對應的流必須爲直接流,接收端的Task必須以直接分組的方式來接收消息,
     * 不然會拋出異常.
     */
    void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
    /**
     * 用來處理異常
     */
    void reportError(Throwable error);
}

Storm提供了接口ISpoutOutputCollector的默認類SpoutOutputCollector,這個類其實是一個代理類,該類持有一個ISpoutOutputCollector類型的對象,全部的操做實際上都過該對象來實現的。SpoutOutputCollector定義以下:設計模式

public class SpoutOutputCollector implements ISpoutOutputCollector {
    /**
     * 持有SpoutOutputCollector要代理的對象
     */
    ISpoutOutputCollector _delegate;

    public SpoutOutputCollector(ISpoutOutputCollector delegate) {
        _delegate = delegate;
    }
    /**
     * 實現了接口中的emit方法,而且提供了它的幾個重載方法
     * eg.若是不指定streamId,默認使用default,若是不指定messageId,則默認使用空(null)
     */
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId){
        return _delegate.emit(streamId, tuple, messageId);
    }

    public List<Integer> emit(List<Object> tuple, Object messageId) {
        return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }

    public List<Integer> emit(List<Object> tuple) {
        return emit(tuple, null);
    }

    public List<Integer> emit(String streamId, List<Object> tuple) {
        return emit(streamId, tuple, null);
    }
    /**
     * 實現了接口中的emitDirect方法,同時也提供了幾個重載方法,與上面emit方法一致.
     */
    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
        _delegate.emitDirect(taskId, streamId, tuple, messageId);
    }

    public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
        emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }

    public void emitDirect(int taskId, String streamId, List<Object> tuple) {
        emitDirect(taskId, streamId, tuple, null);
    }

    public void emitDirect(int taskId, List<Object> tuple) {
        emitDirect(taskId, tuple, null);
    }
    /**
     * 處理異常方法的實現
     */
    @Override
    public void reportError(Throwable error) {
        _delegate.reportError(error);
    }
}

PS: 
代理模式主要分爲兩種:靜態代理和動態代理ide

靜態代理: 
在程序運行前代理類與委託類的關係在運行前就肯定,即在程序運行前就已經存在代理類的字節碼文件了. 
代理模式角色: 
Subject(抽象主題角色):能夠是抽象類也能夠是接口,聲明瞭被委託角色和委託類共有的處理方法; 
RealSubject(具體主題角色):又稱被委託角色、被代理角色,是業務邏輯的具體執行者; 
ProxySubject(代理主題角色):又稱委託類、代理類,負責對真實角色的應用, 
把全部抽象主題類定義的方法限制委託給具體主題角色來實現,而且在具體主題角色處理完畢先後作預處理和藹後處理.測試

靜態代理模式案例以下:this

//抽象主題
public interface Subject {
    public void process(String taskName);
}

被代理角色:設計

public class RealSubject implements Subject {
    @Override
    public void process(String taskName) {
        System.out.println("正在執行任務:"+taskName);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

代理類:代理

public class ProxySubject implements Subject {
    //代理類持有一個委託類的對象引用
    private Subject delegate;
    public ProxySubject(Subject delegate){
        this.delegate=delegate;
    }
    @Override
    public void process(String taskName) {
        //預處理
        this.before();
        //將請求分派給委託類處理
        delegate.process(taskName);
        //善後處理
        this.after();
    }
    private void before(){
        System.out.println("預處理!");
    }
    private void after(){
        System.out.println("善後處理!");
    }
}

案例測試:orm

public class Test {
    public static void main(String[] args) {
        RealSubject subject = new RealSubject();
        ProxySubject p = new ProxySubject(subject);
        p.process("排水");
    }
}

測試結果:對象

預處理!
正在執行任務:排水
善後處理!

靜態代理類的優缺點: 
優勢: 
業務類只需關注業務邏輯自己,這樣就保證了業務類的重用性. 
缺點: 
代理對象的一個接口只服務於一種類型的對象.當要代理的方法不少,就要爲每一種方法進行代理。所以靜態代理在程序規模變大時就沒法很好地勝任工做了.blog

動態代理: 
代理類和委託類的關係在程序運行時才肯定的.動態代理類的源碼是在程序運行期間由JVM根據反射等機制動態生成,因此不存在代理類的字節碼文件.

動態代理模式案例以下:

public interface Service {
    //目標方法 
    public void process();
}
public class UserServiceImpl implements Service {
    @Override
    public void process() {
         System.out.println("用戶service處理");  
    }
}

動態代理實現實例:

public class MyInvocatioHandler implements InvocationHandler {
    private Object target;

    public MyInvocatioHandler(Object target) {
        this.target = target;
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args)
            throws Throwable {
        //System.out.println("-----before-----");
        this.before();
        Object result = method.invoke(target, args);
       // System.out.println("-----end-----");
        this.after();
        return result;
    }
    // 生成代理對象
    public Object getProxy() {
        ClassLoader loader = Thread.currentThread().getContextClassLoader();
        Class<?>[] interfaces = target.getClass().getInterfaces();
        return Proxy.newProxyInstance(loader, interfaces, this);
    }

    private void before(){
        System.out.println("預處理!");
    }
    private void after(){
        System.out.println("善後處理!");
    }
}

案列測試:

public class ProxyTest {
    public static void main(String[] args) {
        Service service = new UserServiceImpl();
        MyInvocatioHandler handler = new MyInvocatioHandler(service);
        Service serviceProxy = (Service)handler.getProxy();
        serviceProxy.process();
    }
}

測試結果:

預處理!
用戶service處理
善後處理!

動態代理的優缺點: 
優勢: 
接口中的全部方法都被轉移到調用處理器一個集中的方法中在方法「運行時」動態的加入,決定你是什麼類型,較靈活 
缺點: 
1. 與靜態代理相比,效率下降了 
2. JDK動態代理只能對實現了接口的類進行代理

歡迎關注下面二維碼進行技術交流: 

相關文章
相關標籤/搜索