本系列的(1)和(2)介紹瞭如何修改File Connector的輪詢機制,使得File Connector每次輪詢只輪詢一個文件。本文主要闡述如何對前面的實現作進一步的修改,這個修改有如下兩點:java
1)如何自定義定義File Connector輪詢文件時使用的Comparator。git
2)如何替換Blocking Queue爲Mule ESB定義的VM Transport中的Queue.github
File Connector 默認的文件排序Comparator是查看file:inbound-endpoint中是否認義了comparator,若是定義了指定的comparator類,則按照comparator類中定義的排序規則對文件列表進行排序,不然就按照輪詢文件時文件放入列表的順序。app
FileMessageReceiver.java異步
protected Comparator<File> getComparator() throws Exception { Object comparatorClassName = getEndpoint().getProperty(COMPARATOR_CLASS_NAME_PROPERTY); if (comparatorClassName != null) { Object reverseProperty = this.getEndpoint().getProperty(COMPARATOR_REVERSE_ORDER_PROPERTY); boolean reverse = false; if (reverseProperty != null) { reverse = Boolean.valueOf((String) reverseProperty); } Class<?> clazz = endpoint.getMuleContext().getExecutionClassLoader().loadClass(comparatorClassName.toString()); Comparator<?> comparator = (Comparator<?>)clazz.newInstance(); return reverse ? new ReverseComparator(comparator) : comparator; } return null; }
Mule ESB提供了一個按照文件修改時間進行排序的Comparator類OlderFirstComparator,若是咱們想根據文件修改時間進行輪詢能夠引用這個Comparator類。async
public class OlderFirstComparator implements Comparator { public int compare(Object o1, Object o2) { if (o1 instanceof File && o2 instanceof File) { File f = (File) o1; File f1 = (File) o2; boolean fileNewer = FileUtils.isFileNewer(f, f1); boolean fileOlder = FileUtils.isFileOlder(f, f1); if (!fileNewer && !fileOlder) { return 0; } else if (fileNewer) { return 1; } else { return -1; } } throw new IllegalArgumentException(MessageFormat.format( "Expected java.io.File instance, but was {0} and {1}", ClassUtils.getShortClassName(o1, "<null>"), ClassUtils.getShortClassName(o2, "<null"))); } }
若是咱們想按照文件名對文件進行輪詢,須要自定義Comparator類,而且把這個Compartor類指定給file:inbound-endpoint的compartor屬性。ide
咱們參照OlderFirstComparator類,定義FileNameSeparator類函數
public class FileNameSeparator implements Comparator<File> { public int compare(File file1, File file2) { String fileName1 = file1.getName(); String fileName2 = file2.getName(); return fileName1.compareToIgnoreCase(fileName2); } }
這裏只比較了文件名,若是還須要比較文件夾名,能夠自行添加相應代碼。優化
最後咱們設置FileNameSeparator類到file:inbound-endpoint上 this
這裏的Reverse Order複選框勾上,表示按照FileNameSeparator的逆序進行文件排序。設置後生成的file:inbound-endpoint的xml文檔以下:
<file:inbound-endpoint path="D:\connectorTest" responseTimeout="10000" doc:name="File" connector-ref="input" comparator="fileconnectortest.FileNameSeparator" reverseOrder="true"/>
咱們還使用students1.csv和students2.csv兩個文件,運行ESB項目,查看運行日誌。
從日誌能夠看出,按文件名逆序排列,students2.csv文件先被處理,再是students1.csv文件被處理。
這個系列中使用Blocking Queue對象(ArrayBlockingQueue)存儲文件解析後的各行文本內容,分別使用PutContentTask和ReadContentTask對Blocking Queue進行寫和讀。這樣作有如下缺點:
1)Blocking Queue與讀寫Task造成了緊耦合關係,每新建一個Task,都必須傳給它一個Blocking Queue引用,這樣不利於代碼解耦。
2)定義和管理Blocking Queue過於繁瑣,若是須要使用多個Blocking Queue,就須要在applicationContext.xml文件中定義多個Blocking Queue的bean對象。
3)不容易對Blocking Queue的讀寫線程進行優化配置。
如今對Blocking Queue進行讀寫的線程都是自定義的Task,放到指定線程池裏執行,若是要對讀寫線程進行優化配置,就須要在applicationContext.xml文件中定義的
線程池進行配置。若是想訪問多個Blocking Queue,且讀寫操做分別使用不一樣的線程池設置,須要定義和配置多個線程池對象,這樣的操做會很是繁瑣。
4)Blocking Queue的數據沒有實現持久化。
VM Queue是Mule ESB定義的VM Transport定義的內部Queue隊列,用於不一樣Flow之間進行信息傳遞,能夠理解成Mule ESB項目內部的Message Queue,因爲是內部MQ,效率要比外部的MQ通訊效率高,Mule的組件能夠像與MQ通訊同樣,向VM Queue發送消息,或者從VM Queue獲取消息,組件不必定與VM Endpoint相連。這樣就實現了VM Queue與Mule 邏輯組件之間的鬆耦合。並且Mule ESB還能夠經過receiver-thread-profile和dispatcher-thread-profile設置VM Queue的消息接收者和發送者的線程數。
基於VM Queue,咱們對系列的項目作如下修改
1)從applicationContext.xml文件中去掉Blocking Queue的定義。修改FileTransformer類,去掉對ReadContentTask的使用。
public Object transformMessage(MuleMessage message, String outputEncoding) throws TransformerException { logger.info("Process File"); try { muleContext.getRegistry().unregisterObject("processCompleteFlag"); muleContext.getRegistry().registerObject("processCompleteFlag", "false"); } catch (RegistrationException e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } //The file input stream sent by file connector. InputStream inputStream = (InputStream)message.getPayload(); PutContentTask putContentTask = new PutContentTask(muleContext, inputStream); queueExecutor.execute(putContentTask); return null; }
2)在ESB項目中新建一個Flow,在Source部分拖入一個VM inbound endpoint,設置它的Path爲data,這個VM Queue的訪問地址即爲vm://data
3) 修改PutContentTask類,去掉原有的Blocking Queue屬性,添加MuleContext屬性muleContext。在構造函數中傳入muleContext。解析文件時將每行文本信息封裝到Mule Message中,發送到VM Queue。
public class PutContentTask implements Runnable { //Current mule context. private MuleContext muleContext; ......... public PutContentTask(MuleContext muleContext, InputStream inputStream) { this.muleContext = muleContext; this.inputStream = inputStream; } @Override public void run() { try { MuleClient muleClient = muleContext.getClient(); Scanner scanner = new Scanner(inputStream); DefaultMuleMessage newMessage = null; while (scanner.hasNext()) { String line = scanner.nextLine(); //Wrapper each line content into mule message and send to vm queue. newMessage = new DefaultMuleMessage(line, muleContext); muleClient.send("vm://data", newMessage); } //Wrapper EOF content into mule message to notify the reader component instance //the file has been parsed completely. DefaultMuleMessage endMessage = new DefaultMuleMessage("EOF", muleContext); muleClient.send("vm://data", endMessage); scanner.close();
關於發送Mule Message到VM Queue有兩點須要注意:
4) 刪除ReadContentTask類,在VM inbound endpoint後建立一個自定義Component類ReadContentComponent
ReadContentComponent類的代碼以下:
public class ReadContentComponent implements Callable { private Logger logger = LogManager.getLogger(ReadContentComponent.class); @Override public Object onCall(MuleEventContext eventContext) throws Exception { MuleMessage muleMessage = eventContext.getMessage(); MuleContext muleContext = eventContext.getMuleContext(); String threadName = Thread.currentThread().getName(); if(muleMessage !=null && muleMessage.getPayload() != null) { String message = muleMessage.getPayloadAsString(); if(message.equals("EOF")) { muleContext.getRegistry().unregisterObject("processCompleteFlag"); muleContext.getRegistry().registerObject("processCompleteFlag", "true"); logger.info("Set Complete Flag True"); } else { System.out.println(threadName + " Line is:" + message); } } return eventContext; } }
5) 配置VM inbound endpoint引用的VM Connector,咱們自定義了一個VM Connector memoryStore,並把它賦給VM inbound endpoint
<vm:connector name="memoryStore" doc:name="VM" validateConnections="true"> <vm:queue-profile> <simple-in-memory-queue-store/> </vm:queue-profile> </vm:connector> <flow name="Read_Data_Flow"> <vm:inbound-endpoint exchange-pattern="one-way" path="data" doc:name="VM" connector-ref="memoryStore"/>
memoryStore的queue使用memory store,即queue發送和接收的Mule Message存在於內存中,當ESB項目終止後Mule Message消息會丟失。咱們也能夠設置queue的profile爲持久化的store,例以下圖中的 file-queue-store或者default-persistent-queue-store.
配置好後,咱們拷貝文件到監控目錄,啓動ESB項目,能夠看到文件被正常解析和處理。
從輸出的結果日誌中的Thread Name(Read_Data_Flow.stage1.xxx)能夠看出,Read_Data_Flow使用了多個線程接收和處理Mule Message。默認Read_Data_Flow使用最大16個線程對VM Queue的Mule Message進行接收和處理。咱們能夠設置這個線程數。咱們在流程文件中定義一個Flow Processing Strategy,定義最大線程數爲4。
<queued-asynchronous-processing-strategy name="singleThreadStrategy" maxThreads="4" minThreads="4" poolExhaustedAction="WAIT" doc:name="Queued Asynchronous Processing Strategy" />
運行ESB項目,使用jconsole查看Read_Data_Flow相關的線程,能夠看到確實有4個線程。
能夠看出線程名爲stage1.01的線程的堆棧和其餘三個線程的堆棧不同,這是由於它們執行的是不一樣的操做。
若是修改log4j2.xml,將org.mule,org.mulesoft的日誌級別從INFO改成DEBUG,能夠從日誌文件看到每一個線程啓動的Work信息
com.mulesoft.mule.config.pool.MonitoredThreadPoolExecutor - Starting Work: org.mule.processor.SedaStageInterceptingMessageProcessor in Thread[[fileconnectortest].Read_Data_Flow.stage1.01,5,main]. Active tasks: 1 (1 threads in a pool) com.mulesoft.mule.config.pool.MonitoredThreadPoolExecutor - Starting Work: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@747140de in Thread[[fileconnectortest]. Read_Data_Flow.stage1.02,5,main]. Active tasks: 2 (2 threads in a pool) com.mulesoft.mule.config.pool.MonitoredThreadPoolExecutor - Starting Work: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@6cad0545 in Thread[[fileconnectortest]. Read_Data_Flow.stage1.03,5,main]. Active tasks: 3 (3 threads in a pool) Starting Work: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@21c436f5 in Thread[[fileconnectortest]. Read_Data_Flow.stage1.04,5,main]. Active tasks: 4 (4 threads in a pool)
咱們能夠看到stage1.01激活的Task相關的Message Processor是SedaStageInterceptingMessageProcessor,而其餘三個線程相關的是AsyncMessageProcessorWorker,也就是說第一個線程負責從VM Queue中讀取Mule Message,並將Message推送給後續的Component,然後面三個線程負責接收Mule Message並處理Mule Message。這體現了Mule ESB使用的SEDA機制(以下圖所示,此圖參考Mule in Action 第二版11.1)
stage1.01線程是從Event Queue中獲取Mule Event對象(Mule Message),stage1.02,stage1.03,stage1.04線程則位於Processing Stage,接收stage1.01取出的Mule Event對象並處理。
瞭解了Read_Data_Flow的線程分工原理,若是咱們要設置Read_Data_Flow對VM Queue中取出的Mule Message作單線程處理,queued-asynchronous-processing-strategy中配置的線程數最小值必須是2,不然只有獲取Mule Event的線程運行,而沒有處理Mule Event的線程存在。咱們也能夠直接配置Synchronous Processing Strategy給Read_Data_Flow,保證整個Flow單線程處理。
<flow name="Read_Data_Flow" processingStrategy="synchronous">
這時接收Mule Event,處理消息的線程是queue executor
2016-08-11 17:07:01,737 [queueExecutor-4] DEBUG fileconnectortest.ReadContentComponent - queueExecutor-4 Line is:20160205,Student205,Class5
修改的代碼上傳到fileconnectortest_vm分支
https://github.com/yosaku01/fileconnectortest/tree/fileconnectortest_vm