[轉] Spring Integration 系統集成

【From】 http://blog.csdn.net/w_x_z_/article/details/53316618java

 

pring Ingegration 提供了基於Spring的EIP(Enterprise Integration Patterns,企業集成模式)的實現。Spring Integration 主要解決的問題是不一樣系統之間交互的問題,經過異步消息驅動來達到系統交互時系統之間的鬆耦合。spring

Spring Integration 主要有Message、Channel、Message EndPoint組成。數組

 

Message多線程

Message是用來在不一樣部分之間傳遞的數據。Message有兩部分組成:消息體(payload)與消息頭(header)。消息體能夠是任何數據類型;消息頭表示的元數據就是解釋消息體的內容。異步

/**
 * A generic message representation with headers and body.
 *
 * @author Mark Fisher
 * @author Arjen Poutsma
 * @since 4.0
 * @see org.springframework.messaging.support.MessageBuilder
 */
public interface Message<T> {

    /**
     * Return the message payload.
     */
    T getPayload();

    /**
     * Return message headers for the message (never {@code null} but may be empty).
     */
    MessageHeaders getHeaders();

}

 

Channelide

在消息系統中,消息發送者發送消息到通道(Channel),消息接受者從通道(Channel)接收消息。post

 

一、頂級接口ui

(1) MessageChannelthis

MessageChannel 是Spring Integration消息通道的頂級接口:spa

public interface MessageChannel {

    /**
     * Constant for sending a message without a prescribed timeout.
     */
    long INDEFINITE_TIMEOUT = -1;


    /**
     * Send a {@link Message} to this channel. If the message is sent successfully,
     * the method returns {@code true}. If the message cannot be sent due to a
     * non-fatal reason, the method returns {@code false}. The method may also
     * throw a RuntimeException in case of non-recoverable errors.
     * <p>This method may block indefinitely, depending on the implementation.
     * To provide a maximum wait time, use {@link #send(Message, long)}.
     * @param message the message to send
     * @return whether or not the message was sent
     */
    boolean send(Message<?> message);

    /**
     * Send a message, blocking until either the message is accepted or the
     * specified timeout period elapses.
     * @param message the message to send
     * @param timeout the timeout in milliseconds or {@link #INDEFINITE_TIMEOUT}
     * @return {@code true} if the message is sent, {@code false} if not
     * including a timeout of an interrupt of the send
     */
    boolean send(Message<?> message, long timeout);

}

 

MessageChannel 有兩大子接口,分別是PollableChannel (可輪詢)和SubscribableChannel(可訂閱)。咱們全部的消息通道類都是現實這兩個接口。

 

(2) PollableChannel

PollableChannel 具有輪詢得到消息的能力。

public interface PollableChannel extends MessageChannel {

    /**
     * Receive a message from this channel, blocking indefinitely if necessary.
     * @return the next available {@link Message} or {@code null} if interrupted
     */
    Message<?> receive();

    /**
     * Receive a message from this channel, blocking until either a message is available
     * or the specified timeout period elapses.
     * @param timeout the timeout in milliseconds or {@link MessageChannel#INDEFINITE_TIMEOUT}.
     * @return the next available {@link Message} or {@code null} if the specified timeout
     * period elapses or the message reception is interrupted
     */
    Message<?> receive(long timeout);

}

 

(3) SubscribableChannel

SubscribableChannel 發送消息給訂閱了MessageHanlder的訂閱者

public interface SubscribableChannel extends MessageChannel {

    /**
     * Register a message handler.
     * @return {@code true} if the handler was subscribed or {@code false} if it
     * was already subscribed.
     */
    boolean subscribe(MessageHandler handler);

    /**
     * Un-register a message handler.
     * @return {@code true} if the handler was un-registered, or {@code false}
     * if was not registered.
     */
    boolean unsubscribe(MessageHandler handler);

}

 

 二、經常使用消息通道

(1)、PublishSubscribeChannel

PublishSubscribeChannel容許廣播消息給全部訂閱者,配置方式以下:

    /**
     * 容許廣播消息給全部訂閱者,當前消息通道的id爲publishSubscribeChannel
     * @return
     */
    @Bean
    public PublishSubscribeChannel publishSubscribeChannel(){
        PublishSubscribeChannel channel = new PublishSubscribeChannel();
        return channel;
    }

 其中,當前消息通道的id爲publishSubscribeChannel。

 

(2)、QueueChannel

QueueChannel容許消息接收者輪詢得到消息,用一個隊列(queue)接收消息,隊列的容量大小可配置,配置方式以下:

    @Bean
    public QueueChannel queueChannel(){
        QueueChannel channel = new QueueChannel(10);
        return channel;
    }

 其中,QueueChannel構造參數10即爲隊列的容量。

 

(3)、PriorityChannel

PriorityChannel可按照優先級將數據存儲到隊列,它依據於消息的消息頭priority屬性,配置方式以下:

    @Bean
    public PriorityChannel priorityChannel(){
        PriorityChannel channel = new PriorityChannel(10);
        return channel;
    }

 

(4)、RendezvousChannel

RendezvousChannel確保每個接收者都接收到消息後再發送消息,配置方式以下:

    @Bean
    public RendezvousChannel rendezvousChannel(){
        RendezvousChannel channel = new RendezvousChannel();
        return channel;
    }

 

(5) DirectChannel

DirectChannel是Spring Integration默認的消息通道,它容許將消息發送給爲一個訂閱者,而後阻礙發送直到消息被接收,配置方式以下:

    @Bean
    public DirectChannel directChannel(){
        DirectChannel channel = new DirectChannel();
        return channel;
    }

 

(6)、ExecutorChannel

ExecutorChannel可綁定一個多線程的task executor,配置方式以下:

    @Bean
    public ExecutorChannel executorChannel(){
        ExecutorChannel channel = new ExecutorChannel(executor());
        return channel;
    }

    @Bean
    public Executor executor(){
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.initialize();
        return taskExecutor;
    }

 

三、通道攔截器

Spring Integration給消息通道提供了通道攔截器(ChannelInterceptor),用來攔截髮送和接收消息的操做.

ChannelInterceptor接口定義以下,咱們只須要實現這個接口便可:

 public interface ChannelInterceptor {

        Message<?> preSend(Message<?> message, MessageChannel channel);

        void postSend(Message<?> message, MessageChannel channel, boolean sent);

        void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);

        boolean preReceive(MessageChannel channel);

        Message<?> postReceive(Message<?> message, MessageChannel channel);

        void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);

    }

 經過以下代碼給全部的channel增長攔截器

channel.addInterceptor(someInterceptor);

 

Message EndPoint

消息端點(Message EndPoint)是真正處理消息的(Message)組件,它還能夠控制通道的路由。咱們可用的消息端點包含以下:

 

(1) Channel Adapter

通道適配器(Channel Adapter)是一種鏈接外部系統或傳輸協議的端點(EndPoint),能夠分爲入站(inbound)和出站(outbound)。 
通道適配器是單向的,入站通道適配器只支持接收消息,出站通道適配器只支持輸出消息。

Spring Integration內置了以下的適配器:

RabbitMQ、Feed、File、FTP/SFTP、Gemfire、HTTP、TCP/UDP、JDBC、JPA、JMS、Mail、MongoDB、Redis、RMI
Twitter、XMPP、WebServices(SOAP、REST)、WebSocket

 

(2) Gateway

消息網關(Gateway)相似於Adapter,可是提供了雙向的請求/返回集成方式,也分爲入站(inbound)和出站(outbound)。 
Spring Integration 對響應的Adapter都提供了Gateway。

 

(3) Service Activator

Service Activator 可調用Spring的Bean來處理消息,並將處理後的結果輸出到指定的消息通道。

 

(4) Router

路由(Router) 可根據消息體內容(Payload Type Router)、消息頭的值(Header Value Router) 以及定義好的接收表(Recipient List Router) 做爲條件,來決定消息傳遞到的通道。

 

(5) Filter

過濾器(Filter) 相似於路由(Router),不一樣的是過濾器不決定消息路由到哪裏,而是決定消息是否能夠傳遞給消息通道。

 

(6) Splitter

拆分器(Splitter)將消息拆分爲幾個部分單獨處理,拆分器處理的返回值是一個集合或者數組。

 

(7) Aggregator

聚合器(Aggregator)與拆分器相反,它接收一個java.util.List做爲參數,將多個消息合併爲一個消息。

 

(8) Enricher

當咱們從外部得到消息後,須要增長額外的消息到已有的消息中,這時就須要使用消息加強器(Enricher)。消息加強器主要有消息體 
加強器(Payload Enricher)和消息頭加強器(Header Enricher)兩種。

 

(9) Transformer

轉換器(Transformer)是對得到的消息進行必定的轉換處理(如數據格式轉換).

 

(10) Bridge

使用鏈接橋(Bridge)能夠簡單的將兩個消息通道鏈接起來。

相關文章
相關標籤/搜索