一、概念
能夠理解成一個隊列,在該隊列不一樣節點部分進行相應的邏輯操做,實現輕量級消息傳遞java
二、結構流程
三、代碼結構:
3.1配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:integration="http://www.springframework.org/schema/integration" xmlns:beans="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> <context:annotation-config /> <context:component-scan base-package="ch.javaee.integration.example.helloWorld"/> <!-- this channel is called by the application and the message is passed to it --> <integration:channel id="inputChannel"/> <integration:channel id="middleChannel"/> <!-- this channel receive the modified message --> <integration:channel id="outputChannel"/> <integration:channel id="finalChannel"/> <!-- this service transform the message in input-channel and send the result to output-channel --> <!-- the service method to call is referenced in explicitly --> <integration:service-activator input-channel="inputChannel" ref="myTestProducerService" method="poll" output-channel="middleChannel"/> <!-- this service receives a message and pass it to printerService --> <!-- the method that consumes the message is implicitly defined by the @ServiceActivator annotation or it should be the only method in the class --> <integration:service-activator input-channel="middleChannel" ref="myTestConsumerService" output-channel="outputChannel"/> <integration:service-activator input-channel="outputChannel" ref="myTestFinalConsumerService" output-channel="finalChannel"/> </beans>
其中一、二、三、4都是定義的channel spring
五、六、7是針對各個channel中數據處理,ref是處理input-channel數據的bean名稱,method是ref中bean裏的方法,output-channel存儲是處理後的結果app
Service Activator:可調用Spring的Bean來處理消息,並將處理後的結果輸出到指定的消息通道,其做用使用起來和在配置文件中的method="****"相似ide
3.2 channel數據處理邏輯
以5爲例:
<integration:service-activator input-channel="inputChannel" ref="myTestProducerService" method="poll" output-channel="middleChannel"/>
3.2.1 input-channel內數據內容:
public class MyTestDemo { public static void main(String[] args) { MyTestProducerService myTestService=new MyTestProducerService(); // create a message with the content "World" // buildMessageAndSend("notification", "identify"); List<String> stringList=new ArrayList<String>(); stringList.add("Monday"); stringList.add("Tuesday"); stringList.add("Wednesday"); for(int i=0;i<stringList.size();i++){ myTestService.putMessage(stringList.get(i)); } buildMessageAndSend("notification", "shutdown"); } private static Map<String, String> buildMessageMap(String type, String msg) { Map<String, String> msgMap = new HashMap<String, String>(); msgMap.put("type", type); msgMap.put("message", msg); msgMap.put("context_id", "123"); return msgMap; } private static void buildMessageAndSend(String type, String msg) { Map<String, String> notifyMap = buildMessageMap(type, msg); // load the Spring context ApplicationContext context = new ClassPathXmlApplicationContext("my-spring-config.xml"); // get the reference to the message channel MessageChannel channel = context.getBean("inputChannel", MessageChannel.class); try { if(channel.send(MessageBuilder.withPayload(notifyMap).build()) == false) { throw new Exception("Unable to publish the shutdown notification."); } } catch (Exception e) { System.out.println(e); } } }
其中 1:加載配置文件ui
2:獲取輸入channel名字this
3:發送數據到input-channel中spa
3.2.2 處理input-channel中數據
@Service public class MyTestProducerService { /** The message queue. */ private static BlockingQueue<Object> messageQueue = new ArrayBlockingQueue<Object>( 100, true); public static void putMessage(Object obj){ messageQueue.add(obj); } public final Message<Object> poll() throws InterruptedException { Object obj = null; obj=messageQueue.take(); Message<Object> message = null; if(obj instanceof String){ String objStr=(String) obj; message= MessageBuilder.withPayload((Object)objStr) .build(); } return message; } }
其中1:對應3.1節中5中ref對應的beancode
2:對應3.1節中5中method對應的方法component
3:返回給3.1節中5中output-channel中的數據orm