使用Lingo加強JMS

雖然activemq+jencks的jms輕量級解決方案已經很好地在psa中work了,尤爲spring的JmsTemplate使得代碼更簡單,可是仍是存在問題。
問題來自暑期作psa的時候,linke忽然提出要求,須要MDP返回些處理信息,好比處理結果、異常,以便後臺監控和前臺顯示,可是,MDP沒有返回值也無法返回異常,當時我只能無奈。java

 
Lingo解決了這個問題,它擴展了JMS,容許異步的函數調用。
在下載了lingo-1.0-M1後(雖然1.2.1發佈了,可是尚未文檔支持,因此暫且用1.0),參考其自帶的example,瞭解了它異步函數調用的代碼思路。
 
客戶端擁有服務端的方法接口,客戶端 將callback和相關參數代入接口,進行異步調用,而服務端的接口實現中 利用callback來返回必要的信息
callback實現了EventListener,提供了返回值和異常的接口,另外涉及到兩個方面,首先,callback自己須要輪詢,其次,callback能夠由實例池管理。
 
第一個方面主要參考了lingo的example, 使用semaphore來進行輪詢
第二個方面並無利用實例池,而是 利用ThreadPoolExecutor來newFixedThreadPool,管理不一樣的異步調用線程,來完成對callback的調度。

配置部分:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" " http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
 <bean id="broker" class="org.activemq.spring.BrokerFactoryBean">
  <property name="config" value="classpath:activemq.xml" />
 </bean>
 <bean id="jmsFactory"
  class="org.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL">
   <value>tcp://localhost:61616</value>
  </property>
 </bean>
 <bean id="destination" class="org.activemq.message.ActiveMQQueue">
  <constructor-arg index="0">
   <value>lingo.demo</value>
  </constructor-arg>
 </bean>
 <bean id="invocationFactory"
  class="org.logicblaze.lingo.LingoRemoteInvocationFactory">
  <constructor-arg>
   <bean class="org.logicblaze.lingo.SimpleMetadataStrategy">
    <!-- 容許單向異步調用 -->
    <constructor-arg value="true" />
   </bean>
  </constructor-arg>
 </bean>
 <!-- 客戶端配置 -->
 <bean id="client"
  class="org.logicblaze.lingo.jms.JmsProxyFactoryBean">
  <property name="serviceInterface"
   value="org.openepo.jms.lingo.MailService" />
  <property name="connectionFactory" ref="jmsFactory" />
  <property name="destination" ref="destination" />
  <!-- 容許客戶端單向異步調用 -->
  <property name="remoteInvocationFactory"
   ref="invocationFactory" />
 </bean>
 <!-- 服務端配置 -->
 <bean id="server"
  class="org.logicblaze.lingo.jms.JmsServiceExporter">
  <property name="service" ref="serverImpl" />
  <property name="serviceInterface"
   value="org.openepo.jms.lingo.MailService" />
  <property name="connectionFactory" ref="jmsFactory" />
  <property name="destination" ref="destination" />
  <property name="invocationFactory" ref="invocationFactory" />
 </bean>
 <!-- 服務端代碼實現 -->
 <bean id="serverImpl" class="org.openepo.jms.lingo.MailServiceImpl" />
 <!-- 管理callback池,處理回調結果 -->
 <bean id="asyncManager" class="org.openepo.jms.lingo.AsyncManager" singleton="false">
  <property name="mailClient" ref="client" />
  <property name="threadSize" value="5" />
 </bean>
</beans>

ResultListener和ResultListenerImpl:callback接口及實現。
 
ResultListener.java:

package org.openepo.jms.lingo;
import java.util.EventListener;
public interface ResultListener extends EventListener {
    public void onResult(Object result);
    // lifecycle end methods
    public void stop();
    public void onException(Exception e);
}
 
ResultListenerImpl.java:

package org.openepo.jms.lingo;
import java.util.ArrayList;
import java.util.List;
public class ResultListenerImpl implements ResultListener {
    private List results = new ArrayList();
    private Object semaphore = new Object();
    private boolean stopped;
    private Exception onException;
    private long waitTime = 1000;
 
    public synchronized void onResult(Object result) {
        results.add(result);
        synchronized (semaphore) {
            semaphore.notifyAll();
        }
    }
 
    // lifecycle end methods
    public void stop() {
        stopped = true;
    }
    public void onException(Exception e) {
        onException = e;
    }
    public Exception getOnException() {
        return onException;
    }
    public List getResults() {
        return results;
    }
    public boolean isStopped() {
        return stopped;
    }
    public void waitForAsyncResponses(int messageCount) {
        System.out.println("Waiting for: " + messageCount + " responses to arrive");
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            try {
                if (hasReceivedResponses(messageCount)) {
                    break;
                }
                synchronized (semaphore) {
                    semaphore.wait(waitTime);
                }
            }
            catch (InterruptedException e) {
                System.out.println("Caught: " + e);
            }
        }
        long end = System.currentTimeMillis() - start;
        System.out.println("End of wait for " + end + " millis");
    }
    protected boolean hasReceivedResponse() {
        return results.isEmpty();
    }
    protected synchronized boolean hasReceivedResponses(int messageCount) {
        return results.size() >= messageCount;
    }
    public long getWaitTime() {
        return waitTime;
    }
    public void setWaitTime(long waitTime) {
        this.waitTime = waitTime;
    }
}

MailService和MailServiceImpl:服務代碼。
 
MailService.java:

package org.openepo.jms.lingo;
import java.util.List;
public interface MailService {
 public void asyncSendMail(List<Mail> mails, ResultListener listener);
}
 
MailServiceImpl.java:

package org.openepo.jms.lingo;
import java.util.List;
public class MailServiceImpl implements MailService {
    public void asyncSendMail(List<Mail> mails, ResultListener listener) {
  
      try {
            for (Mail mail : mails) {
             sendMail(mail);
                Thread.sleep(2000);// 服務端時耗
                listener. onResult(mail.getContent() + " Sended Successfully.");
            }
            listener. stop();
        } catch (Exception e) {
         listener. onException(e);
        }
    }
    public void sendMail(Mail mail) throws Exception {
      // 能夠取消下面的註釋來查看服務端將異常傳給客戶端
      //throw new Exception("Error occurs on server side.");
    }
}
在服務端方法中,能夠利用callback將處理結果,是否結束和異常信息返回客戶端.
 
Mail.java:

package org.openepo.jms.lingo;
import java.io.Serializable;
public class Mail implements Serializable {
 
 private static final long serialVersionUID = 1L;
 
 private String content;
 
 public String getContent() {
  return content;
 }
 public void setContent(String content) {
  this.content = content;
 }
 public Mail(String content) {
  this.content = content;
 }
}

AsyncManager: 各種異步調用的方法能夠集中在這個類中,利用線程池來統一控制callback實例
 
AsyncManager.java:

package org.openepo.jms.lingo;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class AsyncManager {
 static private int threadSize = 10;   //callback池大小
 static private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
   .newFixedThreadPool(threadSize); //callback池
 public void setThreadSize(int threadSize) {
  AsyncManager.threadSize = threadSize;
 }
 private MailService mailClient;
 public void setMailClient(MailService mailClient) {
  this.mailClient = mailClient;
 }
 public AsyncManager() {
 }
 public void sendMails(final List<Mail> mails) {
  // callback對象
  final ResultListenerImpl callBack = new ResultListenerImpl();
  callBack.setWaitTime(2000);
  
  // 異步調用
  mailClient. asyncSendMail(mails, callBack);
 
  // 調用線程池中的callback
  executor.execute(new Runnable() {
   public void run() {
    // callBack 阻塞等待n個消息
    callBack. waitForAsyncResponses(mails.size());
    if (callBack. getOnException() != null) {
     // 服務端異常
     System.out.println("Server Exception: "
       + callBack.getOnException().getMessage());
    } else {
     // 獲得服務端處理結果,打印結果
     for (Object result : callBack. getResults()) {
      System.out.println("Result: " + result);
     }
    }
   }
  });
 }
}
上面匿名類的run方法中,在callback的waitForAsyncResponses方法結束後,能夠檢查callback中的信息,進行異常處理等。
 
下面是測試用例:
@Test
public void test() {
 List<Mail> mails = new ArrayList<Mail>();
 mails.add(new Mail("mail1"));
 mails.add(new Mail("mail2"));
 // 計算時間
 long startTime = System.currentTimeMillis();
  
 try {
  // 異步調用
  asyncManager.sendMails(mails);
  // 沒有阻塞
  System.out.println("Cost time "
     + (System.currentTimeMillis() - startTime) + "ms");
   
  //爲查看結果,sleep主線程
  Thread.sleep(10000);
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}
使用Lingo對JMS加強後,經過callback,使得異步調用的確比較OO了,可是更重要的是服務端的信息能夠經過callback返回給客戶端,客戶端能夠相應地進行處理。 多出了許多代碼,天然複雜度有所增長,可是lingo-1.2.1後,增長了annotation,減小了callback的代碼量。
相關文章
相關標籤/搜索