Spring Integration Framework 從系統文件中讀取配置,持久化數據到數據庫,發送信息到外部客戶端,發佈郵件,FTP平常的快照,執行其它的例行任務。java
咱們的應用程序要跟文件系統,數據庫系統,郵件系統,FTP服務等打交道。 咱們還須要考慮部署各類不一樣的適配器來知足咱們應用程序跟外部其它應用集成的須要。web
共享文件系統:兩個以上應用程序共享一個通用的文件系統,一個可能寫入,其它的讀取。 這時須要將發送者和接收者解耦。可是它短板在於性能,穩定性,以及對文件系統的依賴。正則表達式
單數據庫:應用程序共享單一數據庫。形成網絡延遲,鏈接鎖定問題算法
消息系統:要求發送和接收者解耦,發送消息的程序將一些數據封裝發送給一個消息中間體就不須要在關心它了。 一個消費消息的消費者不管什麼時候均可以開始它的工做流。好處在於消息能夠在這個過程當中被強化,轉換,路由,過濾,而後才送達終端處理。spring
消息模式:消息,信道,轉換器數據庫
pipes and filters 模式 mkonda$ cat just-spring-titles.txt | grep "Just Spring" | wc -l 這裏有三個終端處理 cat,grep,wc cat 命令顯示文件內容,這裏的顯示不是顯示到屏幕上,而是經過管道發送給grep命令,grep命令會獲取文件內容並從中查找 Just Spring字符串,而後將結果經過管道傳遞給wc,由它簡單的在屏幕上顯示結果數量。 這就是一個管道和過濾模型,其中 | 表示管道。編程
若是咱們瞭解JMS或者分佈式技術,那就應該據說過企業級消息。 咱們的應用程序在跟其它應用程序經過網絡進行交互能夠被看做企業應用程序。 咱們須要用有一個應用程序服務器來承載這些應用程序,並暴露服務接口給其它應用程序調用。跨域
Enterprise Integration Pattern:EIP 傳統的編程模型: 好比咱們設計一個從外部系統獲取交易的系統(好比咱們從一個文件中),須要按照以下步驟處理交易: 須要基於交易的類型(新建,撤銷,修正等)對交易進行分類 而後將交易按照類別進行處理和存儲 當一個交易被存儲後,必須有一個審計工具被通知。 咱們能夠將上面的設計編寫到一個組件中。數組
//Pseudo code緩存
public class TradesLoader { private List trades = null; .... public void start(){ trades = loadTrades(tradesFile); (for Trade t: trades){ processTrade(t); persistTrades(t); auditNotify(t); } } public void processTrade(Trade trade){ if (t.getStatus().equalsIgnoreCase("new")) { processNewTrade(); } else if (t.getStatus().equalsIgnoreCase("amend")) { processAmmendTrade(); } else { processOtherTrade(); } } public void loadTrades(File fromFile){ .. } public void persistTrades(Trade trade){ .. } public void auditNotify(Trade trade){ .. } public static void main(String[] args) { new TradesLoader().start(); } } 這是一個順序模型,組件跟整個業務流程高度的耦合。若是咱們想添加另一個工做流,好比咱們將對全部大級別交易發送通知 或者建立一個新任務來收集交易模型,咱們就須要從新構建這個流程,並寫添加更多的if-else語句。 這樣咱們就看出上面這個TradesLoader組件乾的太多了,而不僅僅是夾在交易。 因此咱們須要簡化這種設計,讓TradesLoader組件感到加載完Trades爲止結束。 在上面但進程處理中,TradesLoader首先是獲取Trade而後存儲到一個事先定義好的內部序列中。 相關的操做好比TradeProcessor,TadePersistor,TradeNotifier都將基於這個序列來完成整個工做流程。 獨立的消息模型: 上面的組件應用能夠被重構成TradesLoader在從文件中加載了Trades後,發佈Trades到一個容器數據結構,在消息模型中,這個容器 被稱爲destination 或者是 channel,而其它的處理組件都從這裏獲取Trade。而destination或者channel扮演了一個導管的角色。 Spring Integration 框架是基於Spring的核心組件開發出來的,知足此類模型的編程的優秀框架,它須要咱們提供專門的消息容器程序。 基本概念:消息,信道和端點,Messages,Channels和Endpoints 消息是數據的容器,信道是包含這些消息的地址,端點是一些鏈接到這些信道消費或者發佈消息的組件。 消息是在兩個應用程序之間攜帶信息的對象,它造成於一段,在另外一端被解析處理,消息的生產者或者發佈者發佈消息到一個信道, 消息的預訂者或者消費者鏈接該信道而後獲取消息,從消息中讀取其攜帶的數據,復原會相應的領域對象,進行相關業務處理。 解析一個消息: 消息由兩部分組成:payload和header payload至關於信的內容,是對它感興趣的一方須要處理的內容。 header是頭信息,至關於信封。 public interface Message { T getPayLoad(); MessageHeaders getHeaders(); } public final class MessageHeaders implements Map, Serializable { ... } 框架提供了一個Message接口的通用實現GenericMessage 咱們可使用工具類MessageBuilder來生成: // Create payload object Account a = new Account(); // creating a map with header properties Map accountProperties = new HashMap(); // Set our header properties accountProperties.put("ACCOUNT_EXPIRY","NEVER"); // Use MessageBuilder class to create a Message // and set header properties Message m = MessageBuilder.withPayload(a) .setHeader("ACCOUNT_EXPIRY", "NEVER") .build() 消息信道: 消息數據的容器,被稱爲信道,表示消息被髮送的位置。 框架中咱們定義了MessageChannel接口來描述它。 Spring Integration提供了聲明信道的模型,咱們不須要經過java類來定義它。 // declaratively creating a channel 同時提供了一些具體的實現,QueueChannel,PriorityChannel和RendezvousChannel等。 雖然它們各有不一樣可是底層的設計原則都是同樣的,表現爲一個端點地址。 端點Endpoints: 它們從一個輸入信道消費消息,或者從一個輸出信道發佈消息。也多是隻消費消息,或者只發送消息。 框架提供了不少即插即用的端點實現:Transformers,Splitters,Filters,Routers等。 還提供了一些端點適配器來鏈接像JMS,FTP,JDBC,Twitter等。 好比咱們有一個Service Activator endpoint 它是一個通用的端點,用於當一個消息抵達輸入信道時,就在一個bean上調用一個方法。 //bean to be invoked 這裏service-activator 端點會在消息抵達position-channel時會獲取消息並調用bean上的processNewPosition方法, 而完成這些所須要的代碼,框架已經幫忙實現了。 好比一個交易者須要一個web應用程序生成一個交易,發送到一個JMS目的地,在那裏這些交易會被另一個組件預訂,它們會 鏈接JMS目的地並獲取交易並處理它們。 咱們可使用一個適配器inbound-channel-adapter來從一個輸入JMS Queue獲取消息。 tcp://localhost:61616 // bean that would be invoked public class NewTradeProcessor { public void processNewTrade(Trade t){ // Process your trade here. System.out.println("Message received:"+m.getPayload().toString()); } } 測試類: public class NewTradeProcessorTest { private ApplicationContext ctx = null; private MessageChannel channel = null; // Constructor which instantiates the endpoints public NewTradeProcessorTest() { ctx = new ClassPathXmlApplicationContext("basics-example-beans.xml"); channel = ctx.getBean("trades-channel", MessageChannel.class); } private Trade createNewTrade() { Trade t = new Trade(); t.setId("1234"); ... return t; } private void sendTrade() { Trade trade = createNewTrade(); Message tradeMsg = MessageBuilder.withPayload(trade).build(); channel.send(tradeMsg, 10000); System.out.println("Trade Message published."); } public static void main(String[] args) { NewTradeProcessorTest test = new NewTradeProcessorTest(); test.sendTrade(); } } 經過信道咱們能夠解耦發送者和接收者,在咱們的管道和過濾模型中,Channel是管道。 咱們能夠經過信道和終端組合出複雜的集成方案。 消息信道: MessageChannel接口定義兩個主要發方法: boolean send(Message message); boolean send(Message message, long timeout) 第一個方法執行時必須等到消息發送成功纔會返回控制權,當一個消息沒有被髮送時,第二個方法會接管,拋出異常。 這裏的timeout變量能夠是0,正值或者負值。若是爲負值,則線程會被阻塞直到消息發佈成功。 若是是0,發送方法不管成功與否都會馬上返回控制權,若是正值,則會在發送失敗時等待相應的時間拋出錯誤和異常。 它沒有定義任何的接收消息的方法,接收消息大量依賴接收語法Point-to-Point(p2p)或者publish/Subscribe(Pub/Sub). 在P2P模式下,只有一個消息接收者接收消息,即便有多個接收者能鏈接該信道,可是隻能有一個接收,並且可使隨機選擇。 在Pub/Sub模式下,消息是發送給全部預訂了該消息的消費者,就是說消息被拷貝後分發給全部預訂者。 這裏有個概念叫作message buffering,消息緩存,消息被根據配置按照序列保存到內存或者持久存儲區。 客戶端選擇信道依賴於分發模式(P2P,Pub/Sub)和緩衝或非緩衝情景。 這裏有兩個接口來處理接收端: PollableChannel 和 SubscribleChannel 它們兩個都擴展子MessageChannel,故自動繼承了Send方法。 點到點模式: 消費者僅須要使用任意的PollableChannel接口實現便可: public interface PollableChannel extends MessageChannel { // This call blocks the thread until it receives a message Message receive(); // This call will wait for a specified timeout before // throwing a failure if message is not available Message receive(long timeout); } 框架提供了該接口的三個實現:QueueChannel,PriorityChannel和RendezvousChannel QueueChannel具有消息緩衝功能,PriorityChannel和RendezvousChannel是QueueChannel更出色的實現版本。 它們擴展了P2P和緩衝特性。 Spring會在應用程序啓動時建立它們。 public class QueueChannelTest{ private ApplicationContext ctx=null; private MessageChannel qChannel=null; public QueueChannelTest(){ ctx=new ClassPathXmlApplicationContext("channels-beans.xml"); qChannel = ctx.getBean("q-channel",MessageChannel.class); } public void receive(){ // This method receives a message, however it blocks // indefinitely until it finds a message //Message m =((QueueChannel)qChannel).receive(); // This method receives a message, however it exists // within the 10 seconds even if doesn't find a message Message m = ((QueueChannel) qChannel).receive(10000); System.out.println("Payload: " + m.getPayload()); } } channels-beans.xml ------------------------------------------------------------------------------------------------------------------- Pub/Sub模型: 使用SubscribableChannel,每一個消息都要被廣播給全部註冊的訂閱者。 public interface SubscribableChannel extends MessageChannel{ //to subscribe a MessageHandler for handling the message boolean subscribe(MessageHandler handler); //unsubscribe boolean unsubscribe(MessageHandler handler); } public interface MessageHandler{ //this method is invoked when a fresh message appears on the channel void handleMessage(Message message) throws MessagingException; } public class ChannelTest{ private ApplicationContext ctx=null; private MessageChannel pubSubChannel=null; public ChannelTest(){ ctx=new ClassPathXmlApplicationContext("channels-beans.xml"); pubSubChannel = ctx.getBean("pubsub-channel",MessageChannel.class); } public void subscribe(){ ((PublishSubscribeChannel)pubSubChannel).subscribe(new TradeMessageHandler()); } class TradeMessageHandler implements MessageHandler{ public void handleMessage(Message message) throws MessagingException{ System.out.println("Handling Message:"+ message); } } } -------------------------------------------------------------------------------------------------------------- Queue Channel: 該信道展示出點對點特性,只有一個消費者能夠接收到消息,可是能夠建立多個消費者。 該信道同時還支持緩衝消息,由於它使用一個序列結構將消息保存到內存中。 定義了一個100個元素的序列信道。若是省略則會建立一個無限容量的信道,capacity 的會被設置爲Integer.MAX_VALUE。 在沒有客戶消費消息時,消息序列可能會被塞滿,消息發送者也會被阻塞,直到序列有空間可用或者超時發生。 QueueChannel實現的事First In First Out(FIFO)順序。數據會被保存到java.util.concurrent.LinkedBlockingQueue. QueueChannel 還提供了一個purge方法來淨化序列,用MessageSelector來預約義條件 public List> purge(MessageSelector selector){} 若是給purge方法傳入null參數,將會清空整個序列。 ------------------------------------------------------------------------------------------------------------- Priority Channel: 屬於QueueChannel的一個子類,只是添加了一個消息優先級設置。若是咱們須要發送高優先級消息,那麼使用PriorityChannel是不錯的選擇。 咱們使用MessageHeader的PRIORITY屬性設置優先級。 public void publishPriorityTrade(Trade t){ Message tradeMsg = MessageBuilder.withPayload(t). setHeader(MessageHeades.PRIORITY,10).build(); priorityChannel.send(tradeMsg,10000); System.out.println("The Message is published successfully"); } 若是咱們須要進一步的控制優先級,咱們須要經過實現Comparator> 來提供比較實現。 public class AccountComparator implements Comparator>{ @Override public int compare(Message msg1,Message msg2){ Account a1 = (Account)msg1.getPayload(); Account a2 = (Account)msg2.getPayload(); Integer i1 = a1.getAccountType(); Integer i2 = a1.getAccountType(); return i1.compareTo(i2); } } 咱們須要讓框架直到咱們定義的比較器, ------------------------------------------------------------------------------------------------------------- Rendezvous Channel: 也是QueueChannel的一個子類,展現的是點對點特性。它實現的是一個零容量的序列。 在底層它使用SynchronousQueue數據結構,這就意味着任什麼時候間點都只能有一個消息存在於信道中。 當消息生產者發送一個消息給它時,它會被鎖定直到消息被消費者消費掉。 一樣的,消費者也會被鎖定等待消息出如今信道中。 當咱們但願接收一個請求回覆時,RendezvousChannel是個理想的信道。 客戶端推送一個在消息頭中添加要求回覆的頭信息的請求, public void sendTradeToRendezvous(Trade t) { Message tradeMsg = MessageBuilder.withPayload(t). etHeader(MessageHeaders.REPLY_CHANNEL, "replyChannel").build(); rendezvousChannel.send(tradeMsg, 10000); System.out.println(t.getStatus() + " Trade published to a Rendezvous channel"); } ----------------------------------------------------------------------------------------------- PublishSubscribe Channel: 若是咱們須要將消息發送給多個消費者,則須要使用SubscribeChannel, 這裏沒有定義receive 方法。 由於信息接收時有MessageHandler來處理的。 public class PubSubTest{ MessageHandler handler = new TradeMessageHandler(); private ApplicationContext ctx = null; private PublishSubscribeChannel pubSubChannel = null; ... // subscribe to the channel public void subscribe() { boolean handlerAdded = pubSubChannel.subscribe(handler); System.out.println("Handler added?" + handlerAdded); } // Unsubscribe using the same channel and handler references. public void unsubscribe() { boolean handlerRemoved = pubSubChannel.unsubscribe(handler); System.out.println("Handler removed?" + handlerRemoved); } //Handler to handle the messages class TradeMessageHandler implements MessageHandler { public void handleMessage(Message message) throws MessagingException { System.out.println("Handling Message:" + message); } } } 當消息出如今信道時,它會調用註冊的handler並傳入消息,進行處理。 --------------------------------------------------------------------------------------------------------- Direct Channel: 混合了P2P和Pub/Sub特點,它實現了SubscribableChannel接口,因此咱們須要一個MessageHandler的具體實現來訂閱它。 消息能夠被訂閱它的處理器處理,可是隻有一個訂閱者會獲取消息,呈現了P2P特色。 即便你註冊了多個訂閱者,信道也只會交付給它們中的一個。 該框架用於循環廣播策略從衆多預訂者中選擇一個接收消息。 消息的生產者和消費者都運行在同一個線程中,對於跨域多個資源事務的企業級應用來講很是有用。 若是多個處理器訂閱一個信道,有兩個問題,要選擇哪一個處理器處理消息和若是選擇了一個處理器不能處理消息問題。 一個是load-balancer另外一個是failover 屬性。 load-balancer 標記選擇一個合適的加載策略來選擇處理器。 failover屬性是布爾標記,若是設置爲true,若是被選中的處理拋出異常,它會讓後續的處理器處理消息,默認值爲true。 由於DirectChannel將處理訂閱者的任務委託給MessageDispatcher, load-balancer默認設置爲round-robin,若是要忽略加載平衡策略,則直接能夠將load-balancer的值設置爲 none ------------------------------------------------------------------------------------------------------------ Executor Channel: 實現SubscribableChannel接口,相似DirectChannel,除了是由java.uti.concurrent.Executor 實例來派發消息。 在DirectChannel實現裏,消息發送線程從頭至尾徹底掌控,而在ExecutorChannel中發送線程發送完就結束了。 消息消費是由派發器獨立的線程處理,派發器經過消費者調用消息處理的執行器。 // define the executor 默認設置: ------------------------------------------------------------------------------------------------------------- Null Channel: 是PollableChannel,主要用於測試目的。其發送方法老是返回true,指定的操做都是成功的。而接收方法老是獲取一個空的消息。 底層代碼沒有建立任何序列,可是send操做返回true,receive返回null。 // This is the framework class implementation public class NullChannel implements PollableChannel { // send will always return true public boolean send(Message message) { if (logger.isDebugEnabled()) { logger.debug("message sent to null channel: " + message); } return true; } // receive will return null always public Message receive() { if (logger.isDebugEnabled()) { logger.debug("receive called on null channel"); } return null; } ... } 總之,消息信道是分隔生產者和消費者的主要組件。 =============================================================================== Endpoints 端點 消息終端是從消息框架中分離業務邏輯的組件。它們對於隱藏消息細節很是重要。 它們負責鏈接應用程序組件到消息信道來發送和接收消息。 Spring Integration提供的終端有Service Activator,Channel Adapter,Message Bridge,Gateway,Transformer,Filter,Router等。 首先咱們須要在配置文件中引入相應的命名空間定義: xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd" Service Activator: 是一個通用的端點,它在一個消息抵達某個信道時調用某個bean上定義的方法執行。 若是這個方法有返回值,該返回值將會被髮送到一個輸出信道,前提是已經配置了輸出信道。 使用service-activator 元素配置activator,並設置input-channel和ref的bean。 經過上面的定義,任何到達positions-channel的消息都會被傳遞給NewTradeActivator 並經過由method屬性指定的方法來處理它。 若是目標bean只有一個方法,那麼這裏method屬性就不必設置。框架會自動將該惟一方法解析爲服務方法並調用它。 NewTradeActivator類做爲服務的入口 public class NewTradeActivator { Position position = .. public void processNewPosition(Position t) { System.out.println("Method invoked to process the new Position"+t); // process the position.. // ... } } 該方法返回非空值時,返回值會被包裹在一個Message中併發送給一個output-channel. 若是你想在處理完後發送一個回覆給其它信道,咱們就給方法定義返回值。 // Return value will be wrapped in a Message // and sent to an output channel public Position processNewPosition(Position t) { System.out.println("Method invoked to process the new Position"+t); // process the position.. // ... return position; } 咱們能夠沒有定義output-channel,只要咱們的處理方法帶有返回值,框架將會使用消息頭中名爲replyChannel的屬性來發送回復。 若是在消息頭中沒有發現replyChannel屬性,則會拋出異常。 服務方法能夠有Message或者一個Java對象做爲參數,若是使用一個Java對象做爲參數,消息中的負載就會被取出來傳遞給message。 由於傳入消息是一個java對象,該模式將不會綁定咱們的Spring API,因此是個更好的選項。 上例中一個Position對象被包裹到一個Message中傳遞給信道。 ---------------------------------------------------------------------------------------------------------- Message Bridge: 消息橋是一個簡單的端點,它鏈接不一樣的消息模型或者適配器。 經常使用的消息橋有綁定一個P2P模型信道到一個Pub/Sub模型。 在P2P模型中,一個PollableChannel被端點使用,反之,whereas,一個PublishSubscribableChannel用於Pub/Sub模型。 在配置文件中使用bridge元素定義它, 上例中,bridge會獲取來自輸入信道的消息發佈到輸出信道中。 輸入信道是PublishSubscribeChannel,而輸出信道是QueueChannel 要完成上面的設計,須要一個service activator掛到輸出信道上。 在消息經過橋端點到達輸出信道時,PositionReceiver 會被調用。 --------------------------------------------------------------------------------------------------------- Message Enricher: 一個消息加強組件能夠給一個輸入的消息添加額外的信息併發送給下一個消費者。 好比一個交易由一些編碼信息,好比安全ID,或者客戶帳戶代碼組成。 爲了保持可信度咱們可能在不一樣的階段爲其附加不一樣的信息數據。 框架提供了兩種方式來加強消息:Header Enricher和Payload Enricher Header Enricher: 咱們還能夠設置一些預約義的屬性,好比priority,reply-channel,error-channel等。 框架還支持經過容許header-enricher的頭屬性來引用一個bean來使用payload設置頭屬性。 這裏的ID是在TradeEnricher的幫助下從payload中獲取數據。 public class TradeEnricher { public String enrichHeader(Message m) { Trade t = (Trade)m.getPayload(); return t.getId()+"SRC"; } } Payload Enricher: 使用enricher標籤訂義添加或者加強到payload的部分的內容。 一樣的enricher須要input-channel中的消息,而後將它傳遞給request-channel並等待回覆。 應該有一些其餘組件監聽request-channel 來加強消息。 在加強了消息的payload後,該組件會發布回覆給回覆信道。回覆信道是經過header屬性聲明在消息內部的。 一旦加強器enricher得到了回覆,它會經過表達式將加強數據設置給屬性。 上面的配置中,Price被傳入in-channel,Price消息沒有任何數據,enricher將它傳遞給enricher-req-channel並等待回覆。 獲取消息並加強其信息後返回Price,返回值被髮布到reply-channel. enricher一旦接收到來自reply-channel的消息就繼續處理, 添加額外的屬性,好比price和instrument到消息併發送它們到output-channel。 public void publishPrice(){ //Create a Price object with no values Price p = new Price(); //note the reply-channel as header property Message msg = MessageBuilder.withPayload(p) .setHeader(MessageHeaders.REPLY_CHANNEL, "reply-channel") .build(); channel.send(msg,10000); System.out.println("Price Message published"); } public class PriceEnricher{ public Price enrichHeader(Message m){ Price p = (Price)m.getPayload(); p.setInstrument("IBM"); p.setPrice(111.11); return p; } } Enricher 組件符合Gateway模式。 ------------------------------------------------------------------------------ Gateway:網關 有兩種類型的網關,同步網關和異步網關 同步網關中,消息調用會被阻塞直到消息處理完成,而異步網關,消息的調用時非阻塞的。 Synchronous Gateway: 寫一個網關的第一步就是定義一個接口來描述跟消息系統的交互方法。 好比咱們定義ITradeGateway接口包含一個單一的processTrade方法 public interface ITradeGateway { public Trade processTrade(Trade t); } 配置網關: 上面配置在應用程序上下文加載時,會用默認請求和回覆信道建立一個網關端點,網關有一個service-interface屬性 該屬性執行咱們的ITradeGateway接口。框架的GatewayProxyFactoryBean 爲服務接口建立一個代理(因此你不用爲其提供任何實現) 代理可以使用提供的信道來處理客戶端輸入和輸出的請求。 因此若是客戶端調用一個processTrade方法,也是由代理來完成。 它發佈一個帶有Trade對象的消息到trades-in-channel,代理會阻塞調用直到從trades-out-channel收到回覆爲止。 該回復會被傳回客戶端。會有另一個組件來獲取來trades-in-channel的消息根據業務需求進行處理。 客戶端代碼: public GatewayEndpointTest() { ... public GatewayEndpointTest() { ctx = new ClassPathXmlApplicationContext("endpoints-gateway-beans.xml"); // obtain our service interface tradeGateway = ctx.getBean("tradeGateway",ITradeGateway.class); } public void publishTrade(Trade t) { // call the method to publish the trade! Trade it = tradeGateway.processTrade(t); System.out.println("Trade Message published (Reply)."+it.getStatus()); } public static void main(String[] args) { GatewayEndpointTest test = new GatewayEndpointTest(); Trade t = new Trade(); test.publishTrade(t); } } 咱們從應用程序上下文中獲取一個tradeGateway bean並調用processTrade方法,徹底不依賴於消息框架。 爲了完成這個實例,咱們能夠配置一個Service Activator來從trades-in-channel(跟代理髮布消息是同一個信道)中獲取消息,並傳遞迴復給trades-out-channel(跟代理監聽回覆的信道相同) public class TradeProcessor { public Trade receiveTrade(Trade t) { System.out.println("Received the Trade via Gateway:"+t); t.setStatus("PROCESSED"); return t; } } Asynchronous Gateway: 要獲取異步效果,那麼服務接口的返回值須要改變,如今咱們讓它返回一個Future對象。 import java.util.concurrent.Future; public interface ITradeGatewayAsync { public Future processTrade(Trade t); } public void publishTrade(Trade t) { Future f = tradeGateway.processTrade(t); try { Trade ft = f.get(); } catch (Exception e) { .. } } ------------------------------------------------------------------------------- Delayer:延遲器 用於在發送者和接收者之間引入延遲。 全部的進入in-channel的消息將會被延遲5秒後傳送給out-channel,若是default-delay設置爲0或者負數時,會馬上轉發消息。 咱們還能夠經過消息的header字段來定義每一個消息的延遲週期,爲此咱們須要使用delay-header-name來讓框架知道。 上面配置指定全部具備MSG_DELAY屬性的消息都將按照本身設置的頭字段值進行延遲,沒有該屬性設置的消息按照系統配置的默認值延遲。 ---------------------------------------------------------------------------------- Spring 表達式: Spring集成支持Spring表達式(SpEL)定義。 咱們能夠用表達式來在消息頭和負載中求取屬性值。 這裏headers 屬性引用MessageHeaders,因此咱們可使用headers.property_name語法來查屬性值。 相似的,消息的payload能夠做爲payload屬性來用,因此咱們可使用點號查詢payload對象屬性值。 另外端點好比Transformer,Filter,Service Activator以及Splitter都支持SpEL。 ---------------------------------------------------------------------------------------- 腳本支持: 可使用框架擴展來支持腳本語言,咱們可使用框架支持的語言編寫腳本,而後被端點調用。 事實上咱們可使用實現了JSR-223的任何語言。Groovy,Python/Jython,Ruby/JRuby,JavaScript等。 下面例子是端點從in-channel中獲取消息而後傳遞給position-transformer.groovy腳本: 在該腳本的執行上下文中,腳本能夠經過headers和payload變量訪問消息的MessageHeaders和Payload。 咱們還能夠直接將腳本以CDATA元素的形態直接嵌入到配置文件中。 ------------------------------------------------------------------------------------------ Consumers:消費者 咱們的信道有兩種一種是pollable可輪詢的,一種是subscribable可訂閱的,一樣的咱們的終端消費者也可分爲 Polling Consumer和Event-Driven客戶。 輪詢消費者基於輪詢配置爲消息輪詢信道,它是由客戶程序驅動的。 事件驅動消費者預訂了一個可預訂信道,當消息到達時能夠被異步通知。 Polling Consumers: 其特色是定時的爲消息進行輪詢,框架提供了PollingConsumer類來完成這項工做。 在實例化它是須要給構造函數傳入一個可輪詢信道和一個消息處理器。 消息處理器是一個處理髮布到該信道的消息的接口定義。 private MessageHandler positionsHandler = null; private QueueChannel positionsChannel = null; ... // Instantiating a PollingConsumer PollingConsumer consumer = new PollingConsumer(positionsChannel, positionsHandler); Message m = channel.receive();//or other receive methods System.out.println("Payload: " + m.getPayload()); 消息處理器: public class PositionsHandler implements MessageHandler { public void handleMessage(Message message) throws MessagingException { System.out.println("Handling a message: "+ message.getPayload().toString()); } } public class PositionsPollingConsumer { private PollingConsumer consumer = null; private PositionsHandler positionsHandler = null; public PositionsPollingConsumer(ApplicationContext ctx, QueueChannel positionsChannel) { //instance of handler positionsHandler = new PositionsHandler(); // now create the framework's consumer consumer = new PollingConsumer(positionsChannel, positionsHandler); //You must set the context, or else an error will be thrown consumer.setBeanFactory(ctx); } public void startConsumer() { consumer.start(); } } 調用: PositionsPollingConsumer ppc = new PositionsPollingConsumer(ctx, positionsChannel); ppc.startConsumer(); 使用觸發器輪詢: 上面例子的調用不是輪詢方式的,咱們須要它按時方式輪詢,則須要使用框架的Triggers來完成。 框架爲咱們提供了兩種類型的觸發器:PeriodicTrigger 和 CronTrigger PeriodicTrigger是按照固定的時間間隔輪詢 CronTrigger則是基於Unix的cron表達式進行輪詢,在任務計劃須要複雜需求時,該方式更加靈活。 一旦選中了觸發器,咱們就須要實例化它並將它安裝到consumer,咱們還能夠設置它的initialDelay和fixedRate來進一步控制輪詢。 PeriodicTrigger periodicTrigger = new PeriodicTrigger(2000); // let the polling kick in after half a second periodicTrigger.setInitialDelay(500); // fixed rate polling? periodicTrigger.setFixedRate(false); 其中initialDelay設置輪詢只有在超過期間週期開始輪詢 fixedRate是一個布爾變量,指定輪詢是否須要在規定的時間間隔以內,若是設置爲true,則若是當前的消息處理超過了輪詢週期, 則輪詢會處理下一個消息。 CronTrigger可讓consumer作更多的輪詢,好比咱們須要在工做日的午夜叫醒工做,使用corn表達式能夠這樣複雜的情型。 Cron表達式表示爲由空格分開的字段,有六個字段,每一個字段表示時間不一樣的內容。聲明一個表達式表現咱們的時間需求 // start polling all weekdays at exactly one minute past midnight String cronExpression="* 01 00 * * MON-FRI"; cronTrigger = new CronTrigger(cronExpression); ------------------------------------------------------------------------------------------------------- Event-Driven Consumers: 訂閱消息的消費者被歸類爲Event-Driven Consumers,框架定義該類型消費者爲EventDrivenConsumer 它的基本特徵是等待某人分發到達信道的消息,SubscribableChannel信道支持這類消費。 private EventDrivenConsumer consumer = null; private PositionsHandler positionsHandler = null; private ApplicationContext ctx = null; public PositionsEventDrivenConsumer(ApplicationContext ctx,PublishSubscribeChannel positionsChannel) { positionsHandler = new PositionsHandler(); // instantiate the event driven consumer consumer = new EventDrivenConsumer(positionsChannel, positionsHandler); consumer.setBeanFactory(ctx); } public void startConsumer() { // EventDrivenConsumer exposes start method consumer.start(); } ===================================================================================================== Transformer:轉換器 並非全部的程序都能理解它們消費的數據,好比一個生產者使用Java對象做爲它的負載製造一條消息,而消息消費者對非Java對象 數據感興趣,好比XML或者名值對。爲了幫助消息生產者和消費者進行交流,咱們就須要爲他們定義轉換器。 框架提供了轉換器組件,好比Object-to-String,Object-to-Map等。 框架內建的轉換器: 好比對象到字符串,map,或者JSON格式,框架直接提供。 String轉換器: 這裏須要注意須要做爲payload的POJO對象重寫toString()方法,以免出現意外結果。 Map Transformer: Object --〉Map: Map--〉Object: 序列化和反序列化轉換器: JMS的消息在發送時必須序列化並在接收時反序列化,Payload序列化,即將POJO轉換爲字節數組,反序列化是將字節數組轉換爲POJO對象。 JSON 轉換器: XML轉換器:使用了Spring的Object-to-XML(OXM)框架。 org.springframework.oxm.Marshaller 和 org.springframework.oxm.Unmarshaller 咱們須要XML命名空間來訪問XML轉換器, 聲明瞭空間後,咱們就可使用marshalling-transformer元素來讀取一個輸入信道的消息。 格式化成XML格式而後傳遞迴給輸出信道。 這裏設置的可選參數result-type決定者結果類型。 框架有兩個內建的結果類型:javax.xml.transfor.dom.DOMResult 和 org.springframework.xml.transform.StringResult DOMResult是默認結果類型。 若是咱們但願定義本身的結果類型: 這裏TradeResultFactory 有一個方法createResult實現,它繼承自ResultFactory public class TradeResultFactory implements ResultFactory { public Result createResult(Object payload) { System.out.println("Creating result ->"+payload); //create your own implementation of Result return new TradeResult(); } } XPath 轉換器: 使用XPath表達式解碼XML payload,要求輸入信道傳入XML負載的消息。 咱們能夠經過以下方式建立XML格式payload消息發佈到trades-in-channel信道: private String createNewTradeXml() { return ""; } 自定義轉換器: public class TradeMapTransformer { public Map transform(Trade t) { Map tradeNameValuesMap = new HashMap(); tradeNameValuesMap.put("TRADE_ID", t.getId()); tradeNameValuesMap.put("TRADE_ACCOUNT", t.getAccount()); ... return tradeNameValuesMap; } } 接下來就是讓框架知道咱們定義的轉換類: 這裏須要爲transformer元素聲明的內容有: 一個輸入信道,一個輸出信道,還有就是轉換器實現類。 String轉換器: POJS-to-String: public class PojoToStringTransformer { private final String tradeString = "TRADE_ID=%s, TRADE_ACCOUNT=%s, TRADE_SECURITY=%s, TRADE_DIRECTION=%s, TRADE_STATUS=%s" ; public String transform ( Trade t ) { return String.format( tradeString, t.getId(), t.getAccount(), t.getSecurity(), t.getDirection(), t.getStatus() ) ; } } 配置轉換器: ------------------------------------------------------------------------------------------------------ 使用標籤: 咱們可使用框架的@Transformer 聲明標籤來引入轉換器bean。 component-scan容許容器在transformers包裏掃描聲明標記了的bean。 此時AnnotatedTradeMapTransformer 類會被實例化: @Component public class AnnotatedTradeMapTransformer { @Transformer public Map transform(Trade t) { Map tradeNameValuesMap = new HashMap(); .... return tradeNameValuesMap; } } @Transformer 標記的方法會被調用。 ========================================================================= 工做流組件: 消息應用有時須要一些額外的組件,好比路由,聚合aggregation,排序sequencing等。 一個應用程序可能有特定的條件來路由信息到多個信道或者分解信息而後聚合他們作更深刻的處理。 Spring Integration框架提供了Filters,Routes,Aggregators,和 Splitters等能夠直接使用的組件。 Filters: 消費者有不一樣的消息消費需求,Spring Integration框架使用Filters和配置的條件來決定哪一個應用程序應該接收消息。 public class NewTradeFilter { public boolean isNewTrade(Message message) { Trade t = (Trade)message.getPayload(); return (t.getStatus().equalsIgnoreCase("new")); } } 使用框架的MessageSelector: public class CancelTradeFilter implements MessageSelector{ public boolean accept(Message message) { Trade t = (Trade)message.getPayload(); return (t.getStatus().equalsIgnoreCase("cancel")); } } 非公用的Filter咱們能夠採用內嵌聲明: 使用標籤: @Component public class AnnotatedNewTradeFilter { @Filter public boolean isTradeCancelled(Message message) { Trade t = (Trade)message.getPayload(); return (t.getStatus().equalsIgnoreCase("cancel")); } } ---------------------------------------------------------------------- 丟棄消息: 框架容許在消息不符合過濾條件時,過濾器拋出異常或者轉發另外的信道。 爲了能拋出異常,咱們添加throw-exception-on-rejection屬性到過濾器元素。 或者我在過濾器中裝配一個信道來接收這些被丟棄的消息,咱們使用discard-channel屬性來設置丟棄信道 ---------------------------------------------------------------------------- 路由器: 工做流有一個需求就是根據特定條件將消息發送給一個或者多個信道。 一個路由器組件能夠用於分配消息到多個目的地。 路由器會從一個信道獲取消息並基於payload或者headers內容從新投遞到相關的信道。 filter和router不一樣,Filter基於簡單的布爾測試決定消息是否被髮送,Router基於內容來決定如何轉發消息。 在Filter裏,消息只有兩個方向能夠去,繼續向前傳遞或者被丟棄,使用Filter時,消息一個消息可能會也可能不會出如今輸出信道中。 而使用Router,則單個消息能夠被髮送給一個或者多個信道。 框架提供了一些內建的路由器:基於消息負載內容的PayloadTypeRouter,和基於消息header值的HeaderValueRouter。 PayloadTypeRouter:基於payload的類型決定將消息路由到哪一個信道。 路由組件會附加到一個輸入信道,將獲取消息負載類型,並據此分配它們到特定類型需求的其它信道。 假設咱們有一個信道從外部應用流入Account和Trade消息,咱們但願分離他們到兩個不一樣的信道,Accounts進入accounts-channel ,Trades進入trades-channel 爲此咱們能夠在輸入信道all-in-channel中裝配一個payload-type-router組件,而後使用其mapping屬性來設置期待的類型和其相應的 分配信道。 自定義路由器: 咱們定義路由時須要獲取消息並解析消息的負載或者頭信息,而後根據結果返回相應的信道名稱。 public class BigTradeRouter { public String bigTrade(Message message){ Trade t = message.getPayload(); // check if the trade is a big one and if it is // send it to a separate channel to handle them if(t.getQuantity() > 1000000) return "big-trades-channel"; // else send a normal channel return "normal-trades-channel"; } } 在配置文件中配置自定義的路由器: -------------------------------------------------------------------------------------- Recipient List Router 接收表路由器 分配給某信道的消息定義在一個接收列表中: 好比下面的 一個Trade消息被分配到三個子流信道:persistor-channel 來保存全部輸入的Trade消息 trades-channel用於處理Trade, audit-channel則用於審計目的。 ---------------------------------------------------------------------------------------- Unqualified Messages:不合格消息處理 對於一些沒法經過特定路由邏輯的消息,框架既能夠拋出異常也能夠把它們推到一個默認的信道里。 default-output-channel ... resolution-required 屬性用在路由器上跟默認的輸出信道關聯。它會基於信道的ID來解析任意消息信道。 若是resolution-required設置爲true,可是default-output-channel沒有,則會拋出異常。 使用路由器標籤:@Router @Component public class AnnotatedBigTradeRouter { @Router public String bigTrade(Message message) { Trade t = message.getPayload(); if (t.getQuantity() > 10000) return "big-trades-channel"; return "trades-stdout"; } } ---------------------------------------------------------------------------------------------- Splitters: 一般用於切分消息到小塊,用於更小的自定客戶邏輯。 好比將一個大的消息負載切分紅小塊來並行處理。 框架提供了splitter元素來定義它。 咱們能夠經過自定義POJO實現自定義邏輯或者擴展框架提供的AbstractMessageSplitter抽象類實現其splitMessage()方法 來定義切割器。 自定義實現,通常須要咱們定義一個簡單的POJO,而後實現切分算法。 好比一個正常的Trade和一些加密數據被傳入,需求是獲取加密數據到另外的對象EncryptedTrade 新建立的EncryptedTrade和原來的Trade都將發送給輸出信道。 咱們就可使用切分器來處理。 Trade和EncryptedTrade都繼承自ITrade,其中ITrade只是一個沒有任何內容定義的標記接口。 public class Trade implements ITrade{ private String encryptedMsg = null; ... public String getEncryptedMsg() { return encryptedMsg; } public void setEncryptedMsg(String encryptedMsg) { this.encryptedMsg = encryptedMsg; } ... } public class EncryptedTrade implements ITrade{ private String encryptedMsg = null; public EncryptedTrade(String encryptedMsg) { this.encryptedMsg = encryptedMsg; } public String getEncryptedMsg() { ... } public void setEncryptedMsg(String encryptedMsg) { ... } } 其切分過程是從Trade對象中獲取encryptedMessage用於構造EncryptedMessage對象。 咱們定義CustomEncryptedTradeSplitter封裝上面的邏輯實現: public class CustomEncryptedTradeSplitter{ public List splitMyMessageToTrades(Message message) { List trades = new ArrayList(); TradeImpl t = (TradeImpl)message.getPayload(); //Create a new object from the payload EncryptedTrade et = new EncryptedTrade(t.getEncryptedMsg()); trades.add(t); trades.add(et); System.out.println("Splitting message done, list: "+trades); return trades; } } ------------------------------------------------------------------------------- 使用AbstractMessageSplitter抽象類定義: public class EncryptedTradeSplitter extends AbstractMessageSplitter{ @Override protected Object splitMessage(Message message) { .... return trades; } } 標籤 @Splitter 聲明讓框架知道咱們定義了一個用於切分器的bean。 而後咱們必須爲方法提供@Splitter標籤: @Component public class AnnonatedEncryptedTradeSplitter{ @Splitter public List splitMyMessageToTrades(Message message) { .. } } 該方法返回一個對象集合,每一個對象都會被包裹在Message中做爲payload 每一個子消息都會被印上相同的集合ID:CORRELATION_ID 在SEQUENCE_SIZE參數設置母消息要被切分紅子消息的個數。 在每一個子消息上印上單獨的SEQUENCE_NUMBER Received a message:[Payload=Trade [...][Headers={sequenceNumber=1, correlationId=18c9eee1-4795-4378-b70e-d236027d0c30, ..., sequenceSize=2}] Received a message:[Payload=EncryptedTrade[...]][Headers={sequenceNumber=2, correlationId=18c9eee1-4795-4378-b70e-d236027d0c30, ..., sequenceSize=2}] 其中correlationId相同說明這兩個子消息相互關聯,表示CORRELATION_ID。 sequenceSize 說明由幾個子消息組成,由SEQUENCE_SIZE 頭屬性指定的,。 sequenceNumber 表示每一個子消息的序列碼。 以上三個屬性將在消息聚合時發揮重要做用。 ---------------------------------------------------------------------------------- Aggregation 消息聚合 裝配多個消息來建立一個父消息,與Splitter是一個相反的過程。 在裝配開始以前要求全部的參與裝配的子消息必須都到達。裝配它們要基於相關性和其發佈策略。 public class TradeAggregator { public ITrade aggregateTrade(List childTrades) { ... } } ------------------------------------------------------------------------------------------------- 策略: 聚合不會單獨工做,這種成對算法叫作策略。 這對聚合行爲很是重要。 一個父消息被切分紅大量的子消息,接下來聚合器須要等待這些子消息所有到達,而後才能聚合它們再次成爲一個父消息。 聚合器會遵循某種算法來開始和結束它的工做。這種算法是以correlation和release策略的算法提供給聚合器的。 聚合器使用這些策略來追蹤流入的子消息並對它們進行聚合處理。 Correlation Strategy:關聯策略 定義了用於分組消息的鍵。默認分組是基於CORRELATION_ID ,全部具備相同CORRELATION_ID的消息會被放到同一個籃子裏等待聚合。 框架提供了HeaderAttributeCorrelationStrategy能夠直接使用,也能夠自定義本身的策略。 定義本身的策略能夠經過實現CorrelationStrategy接口或者本身建立本身的POJO。 public class MyCorrelationStrategy implements CorrelationStrategy { public Object getCorrelationKey(Message message) { // implement your own correlation key here // return .. } } 若是是本身定義的POJO則須要在配置時指定方法名:而且要求方法輸入Message參數,返回一個Object對象。 Release Strategy:發佈策略 該策略指定集合後的信息在那個點上被髮送或者發佈。 除了設置了release-on-expire標記狀況外,它會等待信號發送。 默認的策略是 SequenceSizeReleaseStrategy,它實現了ReleaseStrategy接口。 它會檢查經過SEQUENCE_SIZE分組的消息,好比SEQUENCE_SIZE是10,只有當接收全部10個消息而且序列碼在1~10返回內時, 策略會觸發一個信號給聚合器。 跟CorrelationStrategy 相似,咱們能夠經過實現ReleaseStrategy或者建立本身的POJO類來實現。 public class MyReleaseStrategy implements ReleaseStrategy { public boolean canRelease(MessageGroup group) { // implement your strategy here return false; } } 若是自定義POJO則須要給實現方法輸入一個java.util.List對象,返回一個布爾型返回值。 使用release-strategy 和 release-strategy-method 配置: -------------------------------------------------------------------------------------------- 消息存儲: 聚合器會暫存消息直到相關聯的消息都收到。即便有一子消息沒收到,聚合都不能發佈(除非存儲過時) 這就要求聚合器有一個地方存儲消息。 框架提供了相應的選項來該聚合器提供消息存儲。 有兩種存儲選項:內存或者外部數據庫 內存存儲是默認的,它會經過java.util.Map收集消息存儲到內存中。 框架提供了message-store 屬性來引用相應的消息存儲。 若是是默認的內存存儲,則能夠忽略。 下列配置是一個數據庫存儲: ------------------------------------------------------------------------------------------ Resequencer 排序: 消息系統一個重要的特徵就是消息的順序。 儘管排序會傷害性能,可是某些狀況下仍是須要強調排序的,好比在災難恢復時,要求消息按照原來的順序回覆。 Resequencer組件可以對接收的消息進行排序。 它會做用於SEQUENCE_NUMBER頭字段來追蹤順序。 若是咱們將其 release-partial-sequences 標記設置爲true,它會在已收到消息就發佈,而不會等待全部的分組消息成員都到達。 ================================================================================================= Adapter 適配器: 框架提供了不少能夠直接使用的適配器,它們分爲進項適配器和出項適配器。 Inbound adapters獲取文件或者數據庫結果集,Outbound Adapter則是從信道中獲取消息而後轉換它們成爲文件傳輸到一個文件系統中, 或者 轉換成數據庫記錄保存的數據庫中。 這一切的基礎就是文件適配器,其它適配器工做方式與之相似。 File Adapters: 用於從不一樣的文件系統中獲取或者拷貝文件,而後轉換成框架的Message,發佈到一個信道,反之。 框架支持使用文件空間的聲明模型,它還提供了一些類來讀取和寫入文件操做,可是咱們推薦使用命名空間。 使用Namespace file 提供了不少直接使用的元素定義。 在使用以前咱們須要首先引入該Namespace ... 框架爲file提供了兩個適配器用於讀取和寫入文件。 inbound-channeladapter 元素用於讀取文件並將它們做爲File 負責的消息發佈到一個信道。 outbound-channel-adapter 元素用於從信道中獲取消息的負載的File,將其寫入到文件系統。 上面的配置設置了適配器以一秒爲頻次讀取指定文件內容,發佈到files-out-channel信道。 這裏簡單設置了輸出適配器,從信道files-out-channel中獲取消息而後在控制檯打印他們。 File 適配器有一些參數能夠設置: 組織重複讀取文件的設置: prevent-duplicates 該標記設置只做用於每次會話,若是讀取重啓則設置失效。 過濾:使用FileListFilter接口的實現 來作。框架提供了一個AcceptOnceFileListFilter,它在當前會話中只接收一個文件一次。 咱們能夠經過實現FileListFilter來自定義更多過濾: public class PositionsFilter implements FileListFilter { public List filterFiles(Position[] files) { List filteredList = new ArrayList(); // implement your filtering logic here return filteredList; } } 爲了防止重複讀取文件,咱們可使用filename-pattern 和 filename-regex屬性來阻止: ... 這樣適配器只獲取擴展名爲pos的文件,咱們還能夠經過正則表達式加以規範。 文件鎖定: 咱們可使用框架提供了FileLocker接口實現來鎖定文件防止其它進程訪問。 咱們能夠添加本身的自定義鎖: ------------------------------------------------------------------------------------ 獨立文件讀取器: 框架提供FileReadingMessageSource 類,它實現了框架的MessageSource接口,定義了receive()方法。 這是全部須要輪詢消息的基礎接口,返回值是Message對象,該對象包含java.io.File做爲負載。 public class StandaloneFileAdapterTest { // set the directory from where the files need to be picked up File directory = new File("/Users/mkonda/dev/ws"); public void startStandaloneAdatper() { FileReadingMessageSource src = new FileReadingMessageSource(); src.setDirectory(directory); Message msg = src.receive(); System.out.println("Received:"+msg); } public static void main(String[] args) { StandaloneFileAdapterTest test = new StandaloneFileAdapterTest(); test.startAdatper(); } } // declaring the framework's class as a bean private void startAdapterUsingDeclaredBeanRef() { ctx = new ClassPathXmlApplicationContext("adapters-file-beans.xml"); fileReader = ctx.getBean("fileReader", FileReadingMessageSource.class); // now you got the instance, poll for msgs Message msg = fileReader.receive(); System.out.println("Message received from the bean:" + msg); } ----------------------------------------------------------------------------------------------- Outbound Adapter:出項適配器 filewriter 適配器就是從信道中獲取消息而後把它們寫入文件系統。 在file命名空間下使用outbound-channel-adapter 元素來指定出項適配器。 這裏定義的適配器從positions-file-channel 信道獲取消息,而後寫入到 directory屬性指定的目錄中。 綜合入項和出項適配器: ----------------------------------------------------------------------------------------- 獨立的文件適配器: 使用standalone 類更加直觀,使用一個要寫入文件的位置目錄實例化 FileWritingMessageHandler // set the directory File directory = new File("/Users/mkonda/dev/ws/tmp"); .. private void startStandaloneWriter() { // fetch the channel for incoming feed outChannel = ctx.getBean("files-channel",PublishSubscribeChannel.class); handler = new FileWritingMessageHandler(directory); // subscribe to the incoming feed outChannel.subscribe(handler); } ------------------------------------------------------------------------------------------ FTP 適配器: 咱們使用File Transfer Protocol來進行遠程文件獲取和本地文件上載。 咱們的框架提供了inbound和outbound信道適配器。 輸入信道適配器鏈接到一個FTP Server來獲取遠程文件並將它們做爲消息Message的負載。 輸出信道適配器鏈接信道,消費消息,並把消息的負載寫入到遠程服務器目錄。 這兩個適配器均可以使用ftp命名空間下的inbound-channel-adapter 和 outbound-channeladapter 配置,在配置它們時一個前提條件是鏈接配置。 Session Factory:會話工廠 適配器應該要知道它所要鏈接的服務器的詳細細節,包括用戶名和密碼。 框架的DefaultFtpSessionConnectionFactory類提供了這些內容。 咱們須要在配置文件中聲明它,並設置相應的屬性值。 而後將該bean引用給session-factory屬性。 ... 有了會話工廠,咱們就能夠定義FTP適配器了。 Inbound FTP 適配器 使用會話工廠提供的鏈接遠程文件系統,並輪詢文件。若是發現文件,它會獲取文件並以其做爲消息的負載建立消息Message 而後發送給指定的信道進行進一步處理。 ... 組件使用Session 工廠鏈接遠程服務器,從remote-directory指定的文件目錄獲取文件,並把它們包裹成消息發佈到指定信道。 咱們能夠經過 filename-pattern, filename-regex等設置獲取文件的規則。 這裏的local-directory指定的目錄是組件在開始輪詢遠程文件以前會先檢查本地這個目錄。 一旦全部的本地文件都被髮布了,纔會開始對遠程文件進行輪詢和傳輸。 咱們能夠定義本身的文件過濾類,使用filter屬性指定到配置文件中。 Outbound FTP 適配器: 用於建立一個經過FTP發佈消息到遠程文件系統的終端。 ----------------------------------------------------------------------------------------------- Caching Session:緩存會話 框架在雙向信道適配器器上建立一個FTP 會話池來優化網絡訪問。 咱們能夠經過屬性cache-session設置爲false,來關閉它。 ----------------------------------------------------------------------------------------------- JMS 適配器: 框架提供了輸入和輸出適配器來跨外部消息系統接收和發送消息。 輸入適配器能夠從一個JMS目標地(topic 或者queue)獲取消息,而後發佈他們到本地信道。 輸出適配器能夠將本地信道負載轉換成JMS消息發佈到JMS目的地(topic或者queue). 相關的適配器元素定義在jms命名空間下: Inbound 適配器:接收消息 從消息系統接收消息可能很複雜,取決因而由消費客戶端驅動仍是消息提供者驅動。 客戶端會基於某種規則輪詢消息,服務器也會在消息到達時直接發送給客戶端(也就是消息驅動或者事件驅動)。 同步消費者: inbound-channel-adapter負責從JMS服務器獲取消息,經過對端點的配置來鏈接JMS Server,獲取消息,發佈消息到一個本地的信道。 這是一種拉消費,在底層,它使用JmsTemplate的receive方法來拉消息。咱們也能夠提供一個JmsTemplate實例或者同時提供 connectionFactory和destination。 tcp://localhost:61616 鏈接工廠封裝鏈接外部JMS提供者的詳細信息鏈接信息,簡單的定義爲一個bean,並使用適配器的connection-factory屬性 裝配到適配器。若是你把該bean的名字定義成connectionFactory,則不須要指定它,適配器會自動找該名字的bean注入。 咱們使用ActiveMQ做爲提供者,brokerURL指向本地的ActiveMQ服務器。 這裏須要注意的是JMS Destination對象,它實質上是JMS技術裏的一個Queue,適配器鏈接本地ActiveMQ服務,檢查POSITIONS_QUEUE 獲取發現的消息,發佈它到本地應用程序信道positions-channel. Message-Driven Consumers:消息驅動消費 該情形是服務端基於訂閱狀況來驅動的消費類型。message-driven-channel-adapter 元素定義它,消費者須要一個 Spring MessageListener容器或者一個connectionFactory和destination的組合。 這裏須要將Spring的對象轉換爲JMS Message,或者將Message轉換回Spring對象, extract-payload屬性用於轉換消息的Payload 關於消息的Payload轉換: 咱們須要使用轉換器從JMS Message中取出payload,而後放到本地message中。框架爲咱們提供了SimpleMessageConerter 它會將內容轉換爲咱們須要的消息payload, 若是JMS是一個TextMessage,會轉換爲String,若是是ByteMessage,它會轉換爲bytes。 須要注意的是隻有屬性extract-payload設置爲true時,轉換器纔會啓動,默認爲true。 咱們還能夠經過message-converter屬性指定自定義的轉換器。 ----------------------------------------------------------------------------------------------------- 發佈消息:輸出適配器 它的任務是從信道獲取消息併發布到JMS Queue或者Topic中。 底層咱們使用JmsSendingMessageHandler,相反的當 extract-payload設置爲true時,適配器會轉換信道的payload到JMS Message 內容。 ============================================================================================================== JDBC 適配器: 一樣分爲進項適配器和出向適配器,進向適配器從數據庫中獲取數據並將結果集做爲Message負載發佈到本地信道。 出向適配器讀取信道的消息數據保存到數據庫中。 進向JDBC適配器: 負責讀取數據集而後轉換成消息,jdbc命名空間下inbound-channel-adapter用於建立這類端點。 適配器提供一個SQL查詢和一個目的信道,同時還定義一個Datasource實例,用它來提供相關數據庫的鏈接設置。 上面的設置使用query查詢ACCOUNTS表狀態爲NEW的數據而後轉換爲Message發佈到resultset-channel信道。 這裏會將整個查詢結果集List做爲一個消息的payload,記錄的類型依賴於咱們行映射策略。 有時候咱們不但願輪詢結果包含重複的結果,框架提供了update語句追加到每次查詢上。 咱們每次拉數據時,咱們會使用select查詢中的特定設置更新記錄,來避免獲取以更新的數據。 好比咱們只但願獲取新建立的Account記錄,因此咱們能夠更新名爲POLLED的一列。 -------------------------------------------------------------------------------------------------- 出向JDBC適配器: 用於在數據庫中執行SQL查詢,而這查詢語句是有從輸入信道的消息裏獲取的內容構建的。 因此,出向適配器監聽消息信道,獲取消息,抽取消息中相關的值,構造查詢語句並在數據庫上執行該語句。 好比每一個出如今信道trade-persistence-channel的Trade消息應該被保存。 正常狀況下,咱們會寫一個消息消費終端來獲取每一個消息而後用持久化機制來說消息保存到數據庫。 然而,Spring Integration替咱們幹了這些。 咱們只須要配置一個出向適配器: 這裏比較有趣的是query參數的設置,咱們可使用payload鍵標記來規範參數。 每個輸入的消息都會有一個Map類型的payload負載,從這裏面咱們能夠查詢ID,ACCOUNT等 同時咱們還可使用headers這個Map值。 qyery="insert into TRADE t(ID,ACCOUNT,INSTRUMENT,EXPIRY) values(:payload[TRADE_ID], :payload[TRADE_ACCOUNT], :payload[TRADE_INSTRUMENT], :headers[EXPIRY])"> Map消息的建立代碼: public Message> createTradeMessage(){ Map tradeMap = new HashMap(); tradeMap.put("ID", "1929303d"); tradeMap.put("ACCOUNT", "ACC12345"); //.. // Create a Msg using MessageBuilder Message> tradeMsg = MessageBuilder.withPayload(tradeMap).build(); return tradeMsg; } 如此,只要消息一到達persistence-channel,它就會被適配器獲取並自動在數據庫上執行構造的SQL。