Spring Integration

The Cafe Sample(小賣部訂餐例子)

    小賣部有一個訂飲料服務,客戶能夠經過訂單來訂購所須要飲料。小賣部提供兩種咖啡飲料
        LATTE(拿鐵咖啡)和MOCHA(摩卡咖啡)。每種又都分冷飲和熱飲
    整個流程以下:
        1.有一個下訂單模塊,用戶能夠按要求下一個或多個訂單。
        2.有一個訂單處理模塊,處理訂單中那些是關於訂購飲料的。
        3.有一個飲料訂購處理模塊,處理拆分訂購的具體是那些種類的飲料,把具體須要生產的飲料要求發給生產模塊
        4.有一個生產模塊,進行生產。
        5.等生成完成後,有一個訂單確認模塊(Waiter),把訂單的生成的飲料輸出。

        
    這個例子利用Spring Integration實現了靈活的,可配置化的模式集成了上述這些服務模塊。

Spring Integration提供兩種模式的工做方式(Annotation和XML)
先來看一下XML方式,進行示例的開發:
配置文件以下:
<? xml version="1.0" encoding="UTF-8" ?>
< beans:beans  xmlns ="http://www.springframework.org/schema/integration"
    xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:beans
="http://www.springframework.org/schema/beans"
    xmlns:stream
="http://www.springframework.org/schema/integration/stream"
    xsi:schemaLocation
="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
            http://www.springframework.org/schema/integration/stream
            http://www.springframework.org/schema/integration/stream/spring-integration-stream-1.0.xsd"
>

        
<!--  首先來配置一個GateWay組件,提供消息的發送和接收。接口Cafe,提供一個void placeOrder(Order order);方法
        該方法標記了@Gateway(requestChannel="orders"), 實現向orders隊列實現數據的發送
         
-->
    
< gateway  id ="cafe"  service-interface ="org.springframework.integration.samples.cafe.Cafe" />

        
<!--  訂單Channel  -->
    
< channel  id ="orders" />
    
<!--  實現Splitter模式, 接收 orders隊列的消息,調用orderSplitter Bean的split方法,進行消息的分解
    並把分解後的消息,發送到drinks隊列.
     
-->
    
< splitter  input-channel ="orders"  ref ="orderSplitter"  method ="split"  output-channel ="drinks" />

        
<!--  飲料訂單Channel,處理飲料的類別  -->
    
< channel  id ="drinks" />
    
<!--  實現Router模式,接收 drinks隊列的消息, 並觸發 drinkRouter Bean的 resolveOrderItemChannel方法
        由在 resolveOrderItemChannel該方法的返回值(String--隊列名稱)表示把消息路由到那個隊列上
    
-->
    
< router  input-channel ="drinks"  ref ="drinkRouter"  method ="resolveOrderItemChannel" />
        
         
<!--  冷飲生產Channel 最大待處理的數據量爲 10 -->
    
< channel  id ="coldDrinks" >
        
< queue  capacity ="10" />
    
</ channel >
    
<!--  定義一個服務處理器,其做用是定義一個消息接收隊列 codeDrinks,一但收到消息,則
    觸發 barista Bean的 prepareColdDrink方法, 再把 prepareColdDrink方法的值,封成Message的
    payLoad屬性,把消息再發送到preparedDrinks隊列, 
-->
    
< service-activator  input-channel ="coldDrinks"  ref ="barista"
                       method
="prepareColdDrink"  output-channel ="preparedDrinks" />

         
<!--  熱飲生產Channel 最大待處理的數據量爲 10 -->
    
< channel  id ="hotDrinks" >
        
< queue  capacity ="10" />
    
</ channel >
    
<!--  定義一個服務處理器,其做用是定義一個消息接收隊列 hotDrinks,一但收到消息,則
    觸發 barista Bean的 prepareHotDrink 再把 prepareColdDrink方法的值,封成Message的
    payLoad屬性,把消息再發送到preparedDrinks隊列, 
-->
    
< service-activator  input-channel ="hotDrinks"  ref ="barista"
                       method
="prepareHotDrink"  output-channel ="preparedDrinks" />
        
<!--  定義最終進行生產的消息隊列  -->
    
< channel  id ="preparedDrinks" />
    
<!--  實現 aggregator 模式, 接收 preparedDrinks 消息, 並觸發 waiter Bean的prepareDelivery方法
     再把處理好的數據,發送到 deliveries隊列 
-->
    
< aggregator  input-channel ="preparedDrinks"  ref ="waiter"
                method
="prepareDelivery"  output-channel ="deliveries" />

        
<!--  定義一個 stream 適配器,接收 deliveries隊列的消息後,直接輸出到屏幕 -->
    
< stream:stdout-channel-adapter  id ="deliveries" />

    
< beans:bean  id ="orderSplitter"
                class
="org.springframework.integration.samples.cafe.xml.OrderSplitter" />

    
< beans:bean  id ="drinkRouter"
                class
="org.springframework.integration.samples.cafe.xml.DrinkRouter" />

    
< beans:bean  id ="barista"  class ="org.springframework.integration.samples.cafe.xml.Barista" />

    
< beans:bean  id ="waiter"  class ="org.springframework.integration.samples.cafe.xml.Waiter" />

</ beans:beans >

咱們來看一下總體服務是怎麼啓動的
    
    首先咱們來看一下CafeDemo這個類,它觸發下定單操做
org.springframework.integration.samples.cafe.xml.CafeDemo
 1  public   class  CafeDemo {
 2 
 3       public   static   void  main(String[] args) {
 4           //// 加載Spring 配置文件 "cafeDemo.xml"
 5          AbstractApplicationContext context  =   null ;
 6           if (args.length  >   0 ) {
 7              context  =   new  FileSystemXmlApplicationContext(args);
 8          }
 9           else  {
10              context  =   new  ClassPathXmlApplicationContext( " cafeDemo.xml " , CafeDemo. class );
11          }
12           // 取得 Cafe實列
13          Cafe cafe  =  (Cafe) context.getBean( " cafe " );
14           // 準備 發送100條消息(訂單)
15           for  ( int  i  =   1 ; i  <=   100 ; i ++ ) {
16              Order order  =   new  Order(i);
17               //  一杯熱飲  參數說明1.飲料類型 2.數量 3.是不是冷飲(true表示冷飲)
18              order.addItem(DrinkType.LATTE,  2 false );
19               //  一杯冷飲  參數說明1.飲料類型 2.數量 3.是不是冷飲(true表示冷飲)
20              order.addItem(DrinkType.MOCHA,  3 true );
21               // 下發訂單,把消息發給 orders 隊列
22              cafe.placeOrder(order);
23          }
24      }
25 
26  }

下面是Cafe接口的源代碼
public   interface  Cafe {

    
// 定義GateWay, 把消息發送到 orders 隊列, Message的payLoad屬性,保存 order參數值
    @Gateway(requestChannel = " orders " )
    
void  placeOrder(Order order);

}

OrderSplitter 源代碼
 1  public   class  OrderSplitter {
 2 
 3       // 接收 從 orders隊列接收的 order 消息後,調用 order.getItems方法
 4       // 進行訂單的分解, 返回的List<OrderItem>可會,被拆分爲多個消息後(Message.payLoad),發到指定隊列
 5       public  List < OrderItem >  split(Order order) {
 6           return  order.getItems();
 7      }
 8 
 9  }
10 

OrderSplitter.split把消息拆分後,變成多個消息,發送到 drinks隊列.由drinkRouter進行消息的接收。
1  public   class  DrinkRouter {
2 
3       // 從 drinks隊列的消息後,根據orderItem的屬性,選擇路由到不一樣的隊列 coldDrinks或hotDrinks
4       public  String resolveOrderItemChannel(OrderItem orderItem) {
5           return  (orderItem.isIced())  ?   " coldDrinks "  :  " hotDrinks " ;
6      }
7 
8  }

下面看一下,若是是一杯冷飲,則消息發送到 coldDrinks隊列
接收根據配置,由barista Bean的prepareColdDrink方法接收消息後,進行處理
若是是一杯熱飲,則消息發送到 hotDrinks隊列
接收根據配置,由barista Bean的prepareHotDrink方法接收消息後,進行處理
 1  public   class  Barista {
 2 
 3       private   long  hotDrinkDelay  =   5000 ;
 4 
 5       private   long  coldDrinkDelay  =   1000 ;
 6 
 7       private  AtomicInteger hotDrinkCounter  =   new  AtomicInteger();
 8 
 9       private  AtomicInteger coldDrinkCounter  =   new  AtomicInteger();
10 
11 
12       public   void  setHotDrinkDelay( long  hotDrinkDelay) {
13           this .hotDrinkDelay  =  hotDrinkDelay;
14      }
15 
16       public   void  setColdDrinkDelay( long  coldDrinkDelay) {
17           this .coldDrinkDelay  =  coldDrinkDelay;
18      }
19 
20       // 處理熱飲訂單,並生成Drink冷料
21       public  Drink prepareHotDrink(OrderItem orderItem) {
22           try  {
23              Thread.sleep( this .hotDrinkDelay);
24              System.out.println(Thread.currentThread().getName()
25                       +   "  prepared hot drink # "   +  hotDrinkCounter.incrementAndGet()  +   "  for order # "
26                       +  orderItem.getOrder().getNumber()  +   " "   +  orderItem);
27               return   new  Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
28                      orderItem.getShots());
29          }  catch  (InterruptedException e) {
30              Thread.currentThread().interrupt();
31               return   null ;
32          }
33      }
34 
35       // 處理冷飲訂單,並生成Drink冷料
36       public  Drink prepareColdDrink(OrderItem orderItem) {
37           try  {
38              Thread.sleep( this .coldDrinkDelay);
39              System.out.println(Thread.currentThread().getName()
40                       +   "  prepared cold drink # "   +  coldDrinkCounter.incrementAndGet()  +   "  for order # "
41                       +  orderItem.getOrder().getNumber()  +   " "   +  orderItem);
42               return   new  Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
43                      orderItem.getShots());
44          }  catch  (InterruptedException e) {
45              Thread.currentThread().interrupt();
46               return   null ;
47          }
48      }
49 
50  }

接下來,已經把訂單須要生產的飲料已經完成,如今能夠交給服務員(waier)交給客人了。
這裏使用的aggregate模式,讓服務器等待這個訂單的全部飲料生產完後的,交給客戶.

下面來介紹該應用
<!--  一旦定義了 aggregator,其會自動監測隊列的消息,把消息合併後再發生指定的隊列
    通常aggregator的參照 splitter一塊兒使用。Spring Integration會根據接收到的消息中的消息頭CORRELATION_ID 來判斷,若是有相同的CORRELATION_ID發現,則認爲它們須要合成一組,並返回(若是沒有自定義合組接口)。
    固然Spring Integration也提供一個用戶自定的接口來斷定消息合組是否知足要求
   
public   interface  CompletionStrategy {

  
boolean  isComplete(List < Message <?>>  messages);

}


  isComplete的方法,收到的messages消息,都是擁用相同消息頭CORRELATION_ID的消息。

-->
< aggregator  input-channel ="preparedDrinks"  ref ="waiter"
                method
="prepareDelivery"  output-channel ="deliveries" />

最後,完成訂單的消息會發到 waiter隊列
 1  public   class  Waiter {
 2 
 3       public  Delivery prepareDelivery(List < Drink >  drinks) {
 4           return   new  Delivery(drinks);
 5      }
 6 
 7 
 8  }
 9 
10  public   class  Delivery {
11 
12       private   static   final  String SEPARATOR  =   " ----------------------- " ;
13 
14 
15       private  List < Drink >  deliveredDrinks;
16 
17       private   int  orderNumber;
18 
19 
20       public  Delivery(List < Drink >  deliveredDrinks) {
21           assert (deliveredDrinks.size()  >   0 );
22           this .deliveredDrinks  =  deliveredDrinks;
23           this .orderNumber  =  deliveredDrinks.get( 0 ).getOrderNumber();
24      }
25 
26 
27       public   int  getOrderNumber() {
28           return  orderNumber;
29      }
30 
31       public  List < Drink >  getDeliveredDrinks() {
32           return  deliveredDrinks;
33      }
34 
35      @Override
36       public  String toString() {
37          StringBuffer buffer  =   new  StringBuffer(SEPARATOR  +   " \n " );
38          buffer.append( " Order # "   +  getOrderNumber()  +   " \n " );
39           for  (Drink drink : getDeliveredDrinks()) {
40              buffer.append(drink);
41              buffer.append( " \n " );
42          }
43          buffer.append(SEPARATOR  +   " \n " );
44           return  buffer.toString();
45      }
46 
47  }

最後咱們使用一個 stream channel adaptor把訂單生產完成的飲料輸出。
         <!--  定義一個 stream 適配器,接收 deliveries隊列的消息後,直接輸出到屏幕 -->
    
< stream:stdout-channel-adapter  id ="deliveries" />


這樣整個流程就執行完了,最終咱們的飲料產品就按照訂單生產出來了。累了吧,喝咖啡提神着呢!!!

spring-integration官網: http://www.springsource.org/spring-integration

關於 Annotation的介紹,將在 下篇 介紹。

附:xml配置介紹
Service Activator 配置
1  <!-- 配置 Service Activator,接收exampleChannel隊列消息。注:exampleHandler至少有一個方法@ServiceActivator -->
2  < service-activator  input-channel ="exampleChannel"  ref ="exampleHandler" />
3  <!--  會檢查 someMethod方法,是否有 @ServiceActivato 標註 output-channel -->
4  < service-activator  input-channel ="exampleChannel"  ref ="somePojo"  method ="someMethod" />
5  < service-activator  input-channel ="exampleChannel"  output-channel ="replyChannel"
6                     ref ="somePojo"  method ="someMethod" />

<inbound-channel-adapter>
 觸發指定的方法,接收消息隊列配置(觸發輪循訪問的方式)
 1  < inbound-channel-adapter  ref ="source1"  method ="method1"  channel ="channel1" >
 2       < poller >
 3           < interval-trigger  interval ="5000" />
 4       </ poller >
 5  </ inbound-channel-adapter >
 6 
 7  < inbound-channel-adapter  ref ="source2"  method ="method2"  channel ="channel2" >
 8       < poller >
 9           < cron-trigger  expression ="30 * * * * MON-FRI" />
10       </ poller >
11  </ channel-adapter >

 <outbound-channel-adapter/>
 觸發指定的方法,發送消息
1    < outbound-channel-adapter  channel ="channel1"  ref ="target1"  method ="method1" />
2   
3    < outbound-channel-adapter  channel ="channel2"  ref ="target2"  method ="method2" >
4       < poller >
5           < interval-trigger  interval ="3000" />
6       </ poller >
7  </ outbound-channel-adapter >

Router
消息路由方式
1  < bean  id ="payloadTypeRouter"  class ="org.springframework.integration.router.PayloadTypeRouter" >
2       < property  name ="payloadTypeChannelMap" >
3           < map >
4               < entry  key ="java.lang.String"  value-ref ="stringChannel" />
5               < entry  key ="java.lang.Integer"  value-ref ="integerChannel" />
6           </ map >
7       </ property >
8  </ bean >

Aggregator 消息合併
 1  < channel  id ="inputChannel" />
 2 
 3  < aggregator  id ="completelyDefinedAggregator"  1
 4      input-channel ="inputChannel"  2
 5      output-channel ="outputChannel"   3
 6      discard-channel ="discardChannel"   4
 7      ref ="aggregatorBean"  5
 8      method ="add"  6
 9      completion-strategy ="completionStrategyBean"   7
10      completion-strategy-method ="checkCompleteness"  8
11      timeout ="42"  9
12      send-partial-result-on-timeout ="true"  10
13      reaper-interval ="135"  11
14      tracked-correlation-id-capacity ="99"  12
15      send-timeout ="86420000"  13  />  
16 
17  < channel  id ="outputChannel" />
18 
19  < bean  id ="aggregatorBean"  class ="sample.PojoAggregator" />
20 
21  < bean  id ="completionStrategyBean"  class ="sample.PojoCompletionStrategy" />

1

The id of the aggregator is optional. html

2

The input channel of the aggregator. Required. java

3

The channel where the aggregator will send the aggregation results. Optional (because incoming messages can specify a reply channel themselves). spring

4

The channel where the aggregator will send the messages that timed out (if send-partial-results-on-timeout is false)Optional. express

5

A reference to a bean defined in the application context. The bean must implement the aggregation logic as described above. Required. 服務器

6

A method defined on the bean referenced by ref, that implements the message aggregation algorithm. Optional, with restrictions (see above). app

7

A reference to a bean that implements the decision algorithm as to whether a given message group is complete. The bean can be an implementation of the CompletionStrategy interface or a POJO. In the latter case the completion-strategy-method attribute must be defined as well. Optional (by default, the aggregator . ide

8

A method defined on the bean referenced by completion-strategy, that implements the completion decision algorithm. Optional, with restrictions (requires completion-strategy to be present). ui

9

The timeout for aggregating messages (counted from the arrival of the first message). Optional. this

10

Whether upon the expiration of the timeout, the aggregator shall try to aggregate the already arrived messages. Optional (false by default). spa

11

The interval (in milliseconds) at which a reaper task is executed, checking if there are any timed out groups. Optional.

12

The capacity of the correlation id tracker. Remembers the already processed correlation ids, preventing the formation of new groups for messages that arrive after their group has been already processed (aggregated or discarded). Optional.

13

The timeout for sending out messages. Optional.


配置消息合併策略
 1  public   class  PojoCompletionStrategy {
 2 
 3     public   boolean  checkCompleteness(List < Long >  numbers) {
 4       int  sum  =   0 ;
 5       for  ( long  number: numbers) {
 6        sum  +=  number;
 7      }
 8       return  sum  >=  maxValue;
 9    }
10  }


Good Luck!
Yours Matthew!
相關文章
相關標籤/搜索