好久沒有更新Blog了,今天突如其來的閒暇發現了一個很好的功能,籍此更新一下。 java
上一篇基本介紹了Spring Integration,以及其工做模式。也許你會以爲它沒有特點的東西,本身實現也沒有多大難度。是的!本身實現確實是沒多大難度,可是本身實現有要具備良好擴展性的仍是不是那麼容易的。 spring
咱們的系統常常要和其聯繫的多個系統一塊兒協同工做。他們都操做着數據庫的同一張表。如:一個系統向表寫數據,另外一個系統定時的掃描新加入的數據,而後把新加入的數據提取出來,作一些處理。而後更新標誌或者轉移。 sql
這樣的場景你遇到過嗎? 反正咱們是有不少的這樣的場景。 數據庫
我以這樣的爲例作個Demo。 ide
我建立一個這樣的數據庫。DDL SQL如: 測試
CREATE TABLE PUBLIC.PUBLIC.ATTR_MESSAGE ( ATT_CODE VARCHAR(20) NOT NULL, PARENT_CODE VARCHAR(20), ATT_TEXT VARCHAR(100), SEQ NUMERIC(8, 0), OPT_DATE DATE, MARK VARCHAR(1) DEFAULT 'N', PRIMARY KEY(ATT_CODE) );
如上面的表結構,我從別的數據庫提取了幾個列建立一個表。 MARK就是一個標誌列,當新加入的數據爲N, 處理後的會置成Y。 ui
Spring Integration JDBC能給你完成幾乎全部代碼。以下面的Spring配置: url
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:integration="http://www.springframework.org/schema/integration" xmlns:hdbc="http://www.springframework.org/schema/integration/jdbc" xmlns:stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.1.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.1.xsd"> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:jdbc/jdbc.properties</value> </list> </property> </bean> <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName" value="${jdbc.driverClass}" /> <property name="url" value="${jdbc.url}" /> <property name="username" value="${jdbc.username}" /> <property name="password" value="${jdbc.password}" /> </bean> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource"/> </bean> <bean id="jdbcMessageHandler" class="net.dintegration.handler.JdbcMessageHandler" /> <integration:channel id="jdbcinchannel" /> <hdbc:inbound-channel-adapter channel="jdbcinchannel" data-source="dataSource" query="SELECT ATT_CODE, PARENT_CODE, ATT_TEXT, SEQ, OPT_DATE, MARK FROM ATTR_MESSAGE WHERE MARK = 'N'" update="UPDATE ATTR_MESSAGE SET MARK = 'Y' WHERE ATT_CODE IN (:ATT_CODE)"> <integration:poller fixed-rate="10000"> <integration:transactional /> </integration:poller> </hdbc:inbound-channel-adapter> <integration:service-activator input-channel="jdbcinchannel" ref="jdbcMessageHandler"/> </beans>
請你注意其中的: spa
query="SELECT ATT_CODE, PARENT_CODE, ATT_TEXT, SEQ, OPT_DATE, MARK FROM ATTR_MESSAGE WHERE MARK = 'N'" 線程 update="UPDATE ATTR_MESSAGE SET MARK = 'Y' WHERE ATT_CODE IN (:ATT_CODE)" |
如上,咱們只須要編寫一個 jdbcMessageHandler處理咱們的數據就好,其餘的一切都讓Spring Integration爲咱們作好了。
public class JdbcMessageHandler implements MessageHandler { private static Log log = LogFactory.getLog(JdbcMessageHandler.class); public JdbcMessageHandler() { } @Override public void handleMessage(Message<?> message) throws MessagingException { Object obj = message.getPayload(); //分別按照各類樣式輸出obj if(obj == null) { log.info("null"); } else if(obj instanceof String) { log.info(obj); }else if(obj instanceof List) { List bean = (List)obj; log.info(bean); } else { log.info(ReflectionToStringBuilder.reflectionToString(message)); } } }
OK。我向創建的表中插入2條數據, 而後測試。測試類:
public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("jdbc/jdbcIntegrationContext.xml"); context.start(); //讓線程在這裏阻塞,防止JVM退出 }
測試log如:
11-26 19:27:18 [INFO] [support.DefaultLifecycleProcessor(334)] Starting beans in phase 2147483647 11-26 19:27:19 [INFO] [handler.JdbcMessageHandler(49)] [{ATT_CODE=123456, PARENT_CODE=Root, ATT_TEXT=測試數據, SEQ=1, OPT_DATE=14:17:47, MARK=N}, {ATT_CODE=234567, PARENT_CODE=123456, ATT_TEXT=test, SEQ=2, OPT_DATE=14:20:41, MARK=N}] |
它也再次輸出了日誌,如:
11-26 19:30:18 [INFO] [handler.JdbcMessageHandler(49)] [{ATT_CODE=123456, PARENT_CODE=Root, ATT_TEXT=測試數據, SEQ=1, OPT_DATE=14:17:47, MARK=N}, {ATT_CODE=234567, PARENT_CODE=123456, ATT_TEXT=test, SEQ=2, OPT_DATE=14:20:41, MARK=N}] |
它又讀取了MARK爲N的數據。就這樣幾乎不寫任何多餘的代碼就實現了我上面提到的場景。而咱們須要作的,僅僅寫一個MessageHandler處理咱們的數據。
那麼他的擴展性呢?
若是你仔細看了,你就發現
<integration:channel id="jdbcinchannel" />
<integration:service-activator input-channel="jdbcinchannel" ref="jdbcMessageHandler"/>這樣的代碼在上一篇JMS也曾出現過相似的.如:
<integration:channel id="jmsinchannel"/> <integration:channel id="jmsoutchannel" /> <jms:inbound-channel-adapter id="jmsIn" destination="myTopic" channel="jmsinchannel" jms-template="jmsTemplate"> <integration:poller fixed-rate="30000"/> </jms:inbound-channel-adapter> <integration:transformer ref="messageTransformer" input-channel="jmsinchannel" output-channel="jmsoutchannel" /> <integration:service-activator ref="messageHander" input-channel="jmsoutchannel" />
是的,Spring Integration就是經過相似的方式把任何的數據經過管道同樣的把數據導向下一個須要的地方。