Mule ESB File Connector輪詢單個文件的實現(3)

本系列的(1)和(2)介紹瞭如何修改File Connector的輪詢機制,使得File Connector每次輪詢只輪詢一個文件。本文主要闡述如何對前面的實現作進一步的修改,這個修改有如下兩點:java

1)如何自定義定義File Connector輪詢文件時使用的Comparator。git

2)如何替換Blocking Queue爲Mule ESB定義的VM Transport中的Queue.github

File Connector 自定義Comparator

File Connector 默認的文件排序Comparator是查看file:inbound-endpoint中是否認義了comparator,若是定義了指定的comparator類,則按照comparator類中定義的排序規則對文件列表進行排序,不然就按照輪詢文件時文件放入列表的順序。app

FileMessageReceiver.java異步

protected Comparator<File> getComparator() throws Exception
{
	Object comparatorClassName = getEndpoint().getProperty(COMPARATOR_CLASS_NAME_PROPERTY);
	if (comparatorClassName != null)
	{
		Object reverseProperty = this.getEndpoint().getProperty(COMPARATOR_REVERSE_ORDER_PROPERTY);
		boolean reverse = false;
		if (reverseProperty != null)
		{
			reverse = Boolean.valueOf((String) reverseProperty);
		}

		Class<?> clazz = endpoint.getMuleContext().getExecutionClassLoader().loadClass(comparatorClassName.toString());
		Comparator<?> comparator = (Comparator<?>)clazz.newInstance();
		return reverse ? new ReverseComparator(comparator) : comparator;
	}
	return null;
}

Mule ESB提供了一個按照文件修改時間進行排序的Comparator類OlderFirstComparator,若是咱們想根據文件修改時間進行輪詢能夠引用這個Comparator類。async

public class OlderFirstComparator implements Comparator
{
    public int compare(Object o1, Object o2)
    {
        if (o1 instanceof File && o2 instanceof File)
        {
            File f = (File) o1;
            File f1 = (File) o2;
            boolean fileNewer = FileUtils.isFileNewer(f, f1);
            boolean fileOlder = FileUtils.isFileOlder(f, f1);
            if (!fileNewer && !fileOlder)
            {
                return 0;
            }
            else if (fileNewer)
            {
                return 1;
            }
            else
            {
                return -1;
            }

        }
        throw new IllegalArgumentException(MessageFormat.format(
                "Expected java.io.File instance, but was {0} and {1}",
                ClassUtils.getShortClassName(o1, "<null>"),
                ClassUtils.getShortClassName(o2, "<null")));
    }
}

    若是咱們想按照文件名對文件進行輪詢,須要自定義Comparator類,而且把這個Compartor類指定給file:inbound-endpoint的compartor屬性。ide

    咱們參照OlderFirstComparator類,定義FileNameSeparator類函數

public class FileNameSeparator implements Comparator<File> 
{	
	public int compare(File file1, File file2) 
	{		
        String fileName1 = file1.getName();        
        String fileName2 = file2.getName();       
        return fileName1.compareToIgnoreCase(fileName2);
	}
}

這裏只比較了文件名,若是還須要比較文件夾名,能夠自行添加相應代碼。優化

    最後咱們設置FileNameSeparator類到file:inbound-endpoint上    this

這裏的Reverse Order複選框勾上,表示按照FileNameSeparator的逆序進行文件排序。設置後生成的file:inbound-endpoint的xml文檔以下:

<file:inbound-endpoint path="D:\connectorTest" responseTimeout="10000" doc:name="File"   
        		connector-ref="input" comparator="fileconnectortest.FileNameSeparator" 
reverseOrder="true"/>

咱們還使用students1.csv和students2.csv兩個文件,運行ESB項目,查看運行日誌。

從日誌能夠看出,按文件名逆序排列,students2.csv文件先被處理,再是students1.csv文件被處理。

使用VM Queue替換Blocking Queue

    這個系列中使用Blocking Queue對象(ArrayBlockingQueue)存儲文件解析後的各行文本內容,分別使用PutContentTask和ReadContentTask對Blocking Queue進行寫和讀。這樣作有如下缺點:
    1)Blocking Queue與讀寫Task造成了緊耦合關係,每新建一個Task,都必須傳給它一個Blocking Queue引用,這樣不利於代碼解耦。
    2)定義和管理Blocking Queue過於繁瑣,若是須要使用多個Blocking Queue,就須要在applicationContext.xml文件中定義多個Blocking Queue的bean對象。
    3)不容易對Blocking Queue的讀寫線程進行優化配置。
      如今對Blocking Queue進行讀寫的線程都是自定義的Task,放到指定線程池裏執行,若是要對讀寫線程進行優化配置,就須要在applicationContext.xml文件中定義的
      線程池進行配置。若是想訪問多個Blocking Queue,且讀寫操做分別使用不一樣的線程池設置,須要定義和配置多個線程池對象,這樣的操做會很是繁瑣。

    4)Blocking Queue的數據沒有實現持久化。

     VM Queue是Mule ESB定義的VM Transport定義的內部Queue隊列,用於不一樣Flow之間進行信息傳遞,能夠理解成Mule ESB項目內部的Message Queue,因爲是內部MQ,效率要比外部的MQ通訊效率高,Mule的組件能夠像與MQ通訊同樣,向VM Queue發送消息,或者從VM Queue獲取消息,組件不必定與VM Endpoint相連。這樣就實現了VM Queue與Mule 邏輯組件之間的鬆耦合。並且Mule ESB還能夠經過receiver-thread-profile和dispatcher-thread-profile設置VM Queue的消息接收者和發送者的線程數。

    基於VM Queue,咱們對系列的項目作如下修改

1)從applicationContext.xml文件中去掉Blocking Queue的定義。修改FileTransformer類,去掉對ReadContentTask的使用。  

public Object transformMessage(MuleMessage message, String outputEncoding) throws TransformerException {

        logger.info("Process File");        
        try 
        {
            muleContext.getRegistry().unregisterObject("processCompleteFlag");
            muleContext.getRegistry().registerObject("processCompleteFlag", "false");
        } 
        catch (RegistrationException e) {
            logger.error(ExceptionUtils.getFullStackTrace(e));
        }        


        //The file input stream sent by file connector.
        InputStream inputStream = 
                (InputStream)message.getPayload();

        PutContentTask putContentTask =
                new PutContentTask(muleContext, inputStream);
        queueExecutor.execute(putContentTask);          

        return null;
    }

2)在ESB項目中新建一個Flow,在Source部分拖入一個VM inbound endpoint,設置它的Path爲data,這個VM Queue的訪問地址即爲vm://data

3) 修改PutContentTask類,去掉原有的Blocking Queue屬性,添加MuleContext屬性muleContext。在構造函數中傳入muleContext。解析文件時將每行文本信息封裝到Mule Message中,發送到VM Queue。

public class PutContentTask implements Runnable {
	//Current mule context.
	private MuleContext muleContext;
    .........

    public PutContentTask(MuleContext muleContext, InputStream inputStream) {
        this.muleContext = muleContext;
        this.inputStream = inputStream;
    }

    @Override
    public void run() {
    try {
        MuleClient muleClient = muleContext.getClient();
        Scanner scanner = new Scanner(inputStream);
        DefaultMuleMessage newMessage = null;
        while (scanner.hasNext()) {
            String line = scanner.nextLine();
            //Wrapper each line content into mule message and send to vm queue.
            newMessage = new DefaultMuleMessage(line, muleContext);
            muleClient.send("vm://data", newMessage);
        }
        //Wrapper EOF content into mule message to notify the reader component instance
        //the file has been parsed completely.
        DefaultMuleMessage endMessage = new DefaultMuleMessage("EOF", muleContext);
        muleClient.send("vm://data", endMessage);
        scanner.close();

關於發送Mule Message到VM Queue有兩點須要注意:

  • 咱們沒有在File Parser這個Transformer控件後添加VM outbound endpoint,雖然這樣作,咱們能夠沒必要使用MuleClient對象發送Mule Message,可是這樣發送的Mule Message的Payload將是File Parser這個Transformer的transformMessage方法的返回值。這樣是沒法將文件的各行文本逐一發送給VM Queue的。所以咱們仍是採用後臺線程的方式解析文件,發送Mule Message(也能夠在FileTransformer主線程中進行)。
  • 發送Mule Message的方法咱們使用了同步方法send,而不是異步方法dispatch,這是由於咱們須要保證發送到VM Queue中的Mule Message的消息順序。若是使用dispatch方法發送,EOF消息可能先於前面各行的文本消息先發送,這樣接收端在接收到EOF消息時,將沒法準確判斷文件解析是否已經結束。所以咱們使用了同步發送方法send。若是對發送消息的順序沒有特殊要求,可使用dispatch方法。(若是使用send方法發送Message,VM inbound endpoint必須至少一個receiver,這也是咱們下面定義Component從VM Queue中獲取Mule Message的緣由)。

4) 刪除ReadContentTask類,在VM inbound endpoint後建立一個自定義Component類ReadContentComponent

  

ReadContentComponent類的代碼以下:

public class ReadContentComponent implements Callable {

    private Logger logger = LogManager.getLogger(ReadContentComponent.class);

    @Override
    public Object onCall(MuleEventContext eventContext) throws Exception {
        MuleMessage muleMessage = eventContext.getMessage();
        MuleContext muleContext = eventContext.getMuleContext();
        String threadName = Thread.currentThread().getName();
        if(muleMessage !=null && muleMessage.getPayload() != null)
        {
                String message = muleMessage.getPayloadAsString();    
                if(message.equals("EOF"))
                {                    
                    muleContext.getRegistry().unregisterObject("processCompleteFlag");
                    muleContext.getRegistry().registerObject("processCompleteFlag", "true");
                    logger.info("Set Complete Flag True");        
                }
                else
                {
                    System.out.println(threadName + " Line is:" + message);
                }
        }
        return eventContext;
    }
}

5) 配置VM inbound endpoint引用的VM Connector,咱們自定義了一個VM Connector memoryStore,並把它賦給VM inbound endpoint

<vm:connector name="memoryStore" doc:name="VM" validateConnections="true"> 
   <vm:queue-profile>
       <simple-in-memory-queue-store/>
   </vm:queue-profile>
</vm:connector>

<flow name="Read_Data_Flow">
   <vm:inbound-endpoint exchange-pattern="one-way" 
       path="data" doc:name="VM" connector-ref="memoryStore"/>

    memoryStore的queue使用memory store,即queue發送和接收的Mule Message存在於內存中,當ESB項目終止後Mule Message消息會丟失。咱們也能夠設置queue的profile爲持久化的store,例以下圖中的   file-queue-store或者default-persistent-queue-store.

   配置好後,咱們拷貝文件到監控目錄,啓動ESB項目,能夠看到文件被正常解析和處理。

從輸出的結果日誌中的Thread Name(Read_Data_Flow.stage1.xxx)能夠看出,Read_Data_Flow使用了多個線程接收和處理Mule Message。默認Read_Data_Flow使用最大16個線程對VM Queue的Mule Message進行接收和處理。咱們能夠設置這個線程數。咱們在流程文件中定義一個Flow Processing Strategy,定義最大線程數爲4。

<queued-asynchronous-processing-strategy 
    	name="singleThreadStrategy" maxThreads="4"  minThreads="4" 
    	poolExhaustedAction="WAIT" doc:name="Queued Asynchronous Processing Strategy" />

運行ESB項目,使用jconsole查看Read_Data_Flow相關的線程,能夠看到確實有4個線程。

能夠看出線程名爲stage1.01的線程的堆棧和其餘三個線程的堆棧不同,這是由於它們執行的是不一樣的操做。

若是修改log4j2.xml,將org.mule,org.mulesoft的日誌級別從INFO改成DEBUG,能夠從日誌文件看到每一個線程啓動的Work信息

com.mulesoft.mule.config.pool.MonitoredThreadPoolExecutor - 
Starting Work: org.mule.processor.SedaStageInterceptingMessageProcessor 
in Thread[[fileconnectortest].Read_Data_Flow.stage1.01,5,main]. 
Active tasks: 1 (1 threads in a pool)

com.mulesoft.mule.config.pool.MonitoredThreadPoolExecutor - 
Starting Work: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@747140de 
in Thread[[fileconnectortest].
Read_Data_Flow.stage1.02,5,main]. 
Active tasks: 2 (2 threads in a pool)

com.mulesoft.mule.config.pool.MonitoredThreadPoolExecutor - 
Starting Work: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@6cad0545 
in Thread[[fileconnectortest].
Read_Data_Flow.stage1.03,5,main]. Active tasks: 3 (3 threads in a pool)

Starting Work: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@21c436f5 
in Thread[[fileconnectortest].
Read_Data_Flow.stage1.04,5,main]. Active tasks: 4 (4 threads in a pool)

咱們能夠看到stage1.01激活的Task相關的Message Processor是SedaStageInterceptingMessageProcessor,而其餘三個線程相關的是AsyncMessageProcessorWorker,也就是說第一個線程負責從VM Queue中讀取Mule Message,並將Message推送給後續的Component,然後面三個線程負責接收Mule Message並處理Mule Message。這體現了Mule ESB使用的SEDA機制(以下圖所示,此圖參考Mule in Action 第二版11.1)

       stage1.01線程是從Event Queue中獲取Mule Event對象(Mule Message),stage1.02,stage1.03,stage1.04線程則位於Processing Stage,接收stage1.01取出的Mule Event對象並處理。

      瞭解了Read_Data_Flow的線程分工原理,若是咱們要設置Read_Data_Flow對VM Queue中取出的Mule Message作單線程處理,queued-asynchronous-processing-strategy中配置的線程數最小值必須是2,不然只有獲取Mule Event的線程運行,而沒有處理Mule Event的線程存在。咱們也能夠直接配置Synchronous Processing Strategy給Read_Data_Flow,保證整個Flow單線程處理。

<flow name="Read_Data_Flow" processingStrategy="synchronous">

這時接收Mule Event,處理消息的線程是queue executor

2016-08-11 17:07:01,737 [queueExecutor-4] 
DEBUG fileconnectortest.ReadContentComponent - 
queueExecutor-4 Line is:20160205,Student205,Class5

修改的代碼上傳到fileconnectortest_vm分支

https://github.com/yosaku01/fileconnectortest/tree/fileconnectortest_vm

相關文章
相關標籤/搜索