【From】 http://blog.csdn.net/w_x_z_/article/details/53316618java
pring Ingegration 提供了基於Spring的EIP(Enterprise Integration Patterns,企業集成模式)的實現。Spring Integration 主要解決的問題是不一樣系統之間交互的問題,經過異步消息驅動來達到系統交互時系統之間的鬆耦合。spring
Spring Integration 主要有Message、Channel、Message EndPoint組成。數組
Message多線程
Message是用來在不一樣部分之間傳遞的數據。Message有兩部分組成:消息體(payload)與消息頭(header)。消息體能夠是任何數據類型;消息頭表示的元數據就是解釋消息體的內容。異步
/** * A generic message representation with headers and body. * * @author Mark Fisher * @author Arjen Poutsma * @since 4.0 * @see org.springframework.messaging.support.MessageBuilder */ public interface Message<T> { /** * Return the message payload. */ T getPayload(); /** * Return message headers for the message (never {@code null} but may be empty). */ MessageHeaders getHeaders(); }
Channelide
在消息系統中,消息發送者發送消息到通道(Channel),消息接受者從通道(Channel)接收消息。post
一、頂級接口ui
(1) MessageChannelthis
MessageChannel 是Spring Integration消息通道的頂級接口:spa
public interface MessageChannel { /** * Constant for sending a message without a prescribed timeout. */ long INDEFINITE_TIMEOUT = -1; /** * Send a {@link Message} to this channel. If the message is sent successfully, * the method returns {@code true}. If the message cannot be sent due to a * non-fatal reason, the method returns {@code false}. The method may also * throw a RuntimeException in case of non-recoverable errors. * <p>This method may block indefinitely, depending on the implementation. * To provide a maximum wait time, use {@link #send(Message, long)}. * @param message the message to send * @return whether or not the message was sent */ boolean send(Message<?> message); /** * Send a message, blocking until either the message is accepted or the * specified timeout period elapses. * @param message the message to send * @param timeout the timeout in milliseconds or {@link #INDEFINITE_TIMEOUT} * @return {@code true} if the message is sent, {@code false} if not * including a timeout of an interrupt of the send */ boolean send(Message<?> message, long timeout); }
MessageChannel 有兩大子接口,分別是PollableChannel (可輪詢)和SubscribableChannel(可訂閱)。咱們全部的消息通道類都是現實這兩個接口。
(2) PollableChannel
PollableChannel 具有輪詢得到消息的能力。
public interface PollableChannel extends MessageChannel { /** * Receive a message from this channel, blocking indefinitely if necessary. * @return the next available {@link Message} or {@code null} if interrupted */ Message<?> receive(); /** * Receive a message from this channel, blocking until either a message is available * or the specified timeout period elapses. * @param timeout the timeout in milliseconds or {@link MessageChannel#INDEFINITE_TIMEOUT}. * @return the next available {@link Message} or {@code null} if the specified timeout * period elapses or the message reception is interrupted */ Message<?> receive(long timeout); }
(3) SubscribableChannel
SubscribableChannel 發送消息給訂閱了MessageHanlder的訂閱者
public interface SubscribableChannel extends MessageChannel { /** * Register a message handler. * @return {@code true} if the handler was subscribed or {@code false} if it * was already subscribed. */ boolean subscribe(MessageHandler handler); /** * Un-register a message handler. * @return {@code true} if the handler was un-registered, or {@code false} * if was not registered. */ boolean unsubscribe(MessageHandler handler); }
二、經常使用消息通道
(1)、PublishSubscribeChannel
PublishSubscribeChannel容許廣播消息給全部訂閱者,配置方式以下:
/** * 容許廣播消息給全部訂閱者,當前消息通道的id爲publishSubscribeChannel * @return */ @Bean public PublishSubscribeChannel publishSubscribeChannel(){ PublishSubscribeChannel channel = new PublishSubscribeChannel(); return channel; }
其中,當前消息通道的id爲publishSubscribeChannel。
(2)、QueueChannel
QueueChannel容許消息接收者輪詢得到消息,用一個隊列(queue)接收消息,隊列的容量大小可配置,配置方式以下:
@Bean public QueueChannel queueChannel(){ QueueChannel channel = new QueueChannel(10); return channel; }
其中,QueueChannel構造參數10即爲隊列的容量。
(3)、PriorityChannel
PriorityChannel可按照優先級將數據存儲到隊列,它依據於消息的消息頭priority屬性,配置方式以下:
@Bean public PriorityChannel priorityChannel(){ PriorityChannel channel = new PriorityChannel(10); return channel; }
(4)、RendezvousChannel
RendezvousChannel確保每個接收者都接收到消息後再發送消息,配置方式以下:
@Bean public RendezvousChannel rendezvousChannel(){ RendezvousChannel channel = new RendezvousChannel(); return channel; }
(5) DirectChannel
DirectChannel是Spring Integration默認的消息通道,它容許將消息發送給爲一個訂閱者,而後阻礙發送直到消息被接收,配置方式以下:
@Bean public DirectChannel directChannel(){ DirectChannel channel = new DirectChannel(); return channel; }
(6)、ExecutorChannel
ExecutorChannel可綁定一個多線程的task executor,配置方式以下:
@Bean public ExecutorChannel executorChannel(){ ExecutorChannel channel = new ExecutorChannel(executor()); return channel; } @Bean public Executor executor(){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); taskExecutor.setMaxPoolSize(10); taskExecutor.setQueueCapacity(25); taskExecutor.initialize(); return taskExecutor; }
三、通道攔截器
Spring Integration給消息通道提供了通道攔截器(ChannelInterceptor),用來攔截髮送和接收消息的操做.
ChannelInterceptor接口定義以下,咱們只須要實現這個接口便可:
public interface ChannelInterceptor { Message<?> preSend(Message<?> message, MessageChannel channel); void postSend(Message<?> message, MessageChannel channel, boolean sent); void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex); boolean preReceive(MessageChannel channel); Message<?> postReceive(Message<?> message, MessageChannel channel); void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex); }
經過以下代碼給全部的channel增長攔截器
channel.addInterceptor(someInterceptor);
Message EndPoint
消息端點(Message EndPoint)是真正處理消息的(Message)組件,它還能夠控制通道的路由。咱們可用的消息端點包含以下:
(1) Channel Adapter
通道適配器(Channel Adapter)是一種鏈接外部系統或傳輸協議的端點(EndPoint),能夠分爲入站(inbound)和出站(outbound)。
通道適配器是單向的,入站通道適配器只支持接收消息,出站通道適配器只支持輸出消息。
Spring Integration內置了以下的適配器:
RabbitMQ、Feed、File、FTP/SFTP、Gemfire、HTTP、TCP/UDP、JDBC、JPA、JMS、Mail、MongoDB、Redis、RMI Twitter、XMPP、WebServices(SOAP、REST)、WebSocket
(2) Gateway
消息網關(Gateway)相似於Adapter,可是提供了雙向的請求/返回集成方式,也分爲入站(inbound)和出站(outbound)。
Spring Integration 對響應的Adapter都提供了Gateway。
(3) Service Activator
Service Activator 可調用Spring的Bean來處理消息,並將處理後的結果輸出到指定的消息通道。
(4) Router
路由(Router) 可根據消息體內容(Payload Type Router)、消息頭的值(Header Value Router) 以及定義好的接收表(Recipient List Router) 做爲條件,來決定消息傳遞到的通道。
(5) Filter
過濾器(Filter) 相似於路由(Router),不一樣的是過濾器不決定消息路由到哪裏,而是決定消息是否能夠傳遞給消息通道。
(6) Splitter
拆分器(Splitter)將消息拆分爲幾個部分單獨處理,拆分器處理的返回值是一個集合或者數組。
(7) Aggregator
聚合器(Aggregator)與拆分器相反,它接收一個java.util.List做爲參數,將多個消息合併爲一個消息。
(8) Enricher
當咱們從外部得到消息後,須要增長額外的消息到已有的消息中,這時就須要使用消息加強器(Enricher)。消息加強器主要有消息體
加強器(Payload Enricher)和消息頭加強器(Header Enricher)兩種。
(9) Transformer
轉換器(Transformer)是對得到的消息進行必定的轉換處理(如數據格式轉換).
(10) Bridge
使用鏈接橋(Bridge)能夠簡單的將兩個消息通道鏈接起來。