本文主要是解析SpoutOutputCollector源碼,順便分析該類中所涉及的設計模式–代理模式。
首先介紹一下Spout輸出收集器接口–ISpoutOutputCollector,該接口主要聲明瞭如下3個抽象方法用來約束ISpoutOutputCollector的實現類。接口定義與方法說明以下:java
/** * ISpoutOutputCollector:Spout輸出收集器接口 */ public interface ISpoutOutputCollector { /** * 改方法用來向外發送數據,它的返回值是該消息全部發送目標的taskID集合; * 參數: * streamId:消息Tuple將要被輸出到的流 * tuple:要輸出的消息,是一個Object列表 * messageId:輸出消息的標記信息,若是messageId被設置爲null,則Storm不會追蹤該消息, * 不然它會被用來追蹤所發出的消息處理狀況 */ List<Integer> emit(String streamId, List<Object> tuple, Object messageId); /** * 該方法與上面emit方法相似,區別在於: * 1.數據(消息)只由所指定taskId的Task接收;(這就意味着若是沒有下游節點接收該消息,則該消息就沒有被真正發送) * 2.該方法要求參數streamId所對應的流必須爲直接流,接收端的Task必須以直接分組的方式來接收消息, * 不然會拋出異常. */ void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId); /** * 用來處理異常 */ void reportError(Throwable error); }
Storm提供了接口ISpoutOutputCollector的默認類SpoutOutputCollector,這個類其實是一個代理類,該類持有一個ISpoutOutputCollector類型的對象,全部的操做實際上都過該對象來實現的。SpoutOutputCollector定義以下:設計模式
public class SpoutOutputCollector implements ISpoutOutputCollector { /** * 持有SpoutOutputCollector要代理的對象 */ ISpoutOutputCollector _delegate; public SpoutOutputCollector(ISpoutOutputCollector delegate) { _delegate = delegate; } /** * 實現了接口中的emit方法,而且提供了它的幾個重載方法 * eg.若是不指定streamId,默認使用default,若是不指定messageId,則默認使用空(null) */ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId){ return _delegate.emit(streamId, tuple, messageId); } public List<Integer> emit(List<Object> tuple, Object messageId) { return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId); } public List<Integer> emit(List<Object> tuple) { return emit(tuple, null); } public List<Integer> emit(String streamId, List<Object> tuple) { return emit(streamId, tuple, null); } /** * 實現了接口中的emitDirect方法,同時也提供了幾個重載方法,與上面emit方法一致. */ public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { _delegate.emitDirect(taskId, streamId, tuple, messageId); } public void emitDirect(int taskId, List<Object> tuple, Object messageId) { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId); } public void emitDirect(int taskId, String streamId, List<Object> tuple) { emitDirect(taskId, streamId, tuple, null); } public void emitDirect(int taskId, List<Object> tuple) { emitDirect(taskId, tuple, null); } /** * 處理異常方法的實現 */ @Override public void reportError(Throwable error) { _delegate.reportError(error); } }
PS:
代理模式主要分爲兩種:靜態代理和動態代理ide
靜態代理:
在程序運行前代理類與委託類的關係在運行前就肯定,即在程序運行前就已經存在代理類的字節碼文件了.
代理模式角色:
Subject(抽象主題角色):能夠是抽象類也能夠是接口,聲明瞭被委託角色和委託類共有的處理方法;
RealSubject(具體主題角色):又稱被委託角色、被代理角色,是業務邏輯的具體執行者;
ProxySubject(代理主題角色):又稱委託類、代理類,負責對真實角色的應用,
把全部抽象主題類定義的方法限制委託給具體主題角色來實現,而且在具體主題角色處理完畢先後作預處理和藹後處理.測試
靜態代理模式案例以下:this
//抽象主題 public interface Subject { public void process(String taskName); }
被代理角色:設計
public class RealSubject implements Subject { @Override public void process(String taskName) { System.out.println("正在執行任務:"+taskName); try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
代理類:代理
public class ProxySubject implements Subject { //代理類持有一個委託類的對象引用 private Subject delegate; public ProxySubject(Subject delegate){ this.delegate=delegate; } @Override public void process(String taskName) { //預處理 this.before(); //將請求分派給委託類處理 delegate.process(taskName); //善後處理 this.after(); } private void before(){ System.out.println("預處理!"); } private void after(){ System.out.println("善後處理!"); } }
案例測試:orm
public class Test { public static void main(String[] args) { RealSubject subject = new RealSubject(); ProxySubject p = new ProxySubject(subject); p.process("排水"); } }
測試結果:對象
預處理! 正在執行任務:排水 善後處理!
靜態代理類的優缺點:
優勢:
業務類只需關注業務邏輯自己,這樣就保證了業務類的重用性.
缺點:
代理對象的一個接口只服務於一種類型的對象.當要代理的方法不少,就要爲每一種方法進行代理。所以靜態代理在程序規模變大時就沒法很好地勝任工做了.blog
動態代理:
代理類和委託類的關係在程序運行時才肯定的.動態代理類的源碼是在程序運行期間由JVM根據反射等機制動態生成,因此不存在代理類的字節碼文件.
動態代理模式案例以下:
public interface Service { //目標方法 public void process(); }
public class UserServiceImpl implements Service { @Override public void process() { System.out.println("用戶service處理"); } }
動態代理實現實例:
public class MyInvocatioHandler implements InvocationHandler { private Object target; public MyInvocatioHandler(Object target) { this.target = target; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //System.out.println("-----before-----"); this.before(); Object result = method.invoke(target, args); // System.out.println("-----end-----"); this.after(); return result; } // 生成代理對象 public Object getProxy() { ClassLoader loader = Thread.currentThread().getContextClassLoader(); Class<?>[] interfaces = target.getClass().getInterfaces(); return Proxy.newProxyInstance(loader, interfaces, this); } private void before(){ System.out.println("預處理!"); } private void after(){ System.out.println("善後處理!"); } }
案列測試:
public class ProxyTest { public static void main(String[] args) { Service service = new UserServiceImpl(); MyInvocatioHandler handler = new MyInvocatioHandler(service); Service serviceProxy = (Service)handler.getProxy(); serviceProxy.process(); } }
測試結果:
預處理! 用戶service處理 善後處理!
動態代理的優缺點:
優勢:
接口中的全部方法都被轉移到調用處理器一個集中的方法中在方法「運行時」動態的加入,決定你是什麼類型,較靈活
缺點:
1. 與靜態代理相比,效率下降了
2. JDK動態代理只能對實現了接口的類進行代理
歡迎關注下面二維碼進行技術交流: