Mule ESB File Connector輪詢單個文件的實現(2)

本系列(1)中已經實現了對指定文件夾下的文件的輪詢和處理。可是根據File Connector的FileMessageReceiver的poll方法代碼java

@Override
public void poll()
{
   try
   {
       List<File> files = this.listFiles();
       if (logger.isDebugEnabled())
       {
           logger.debug("Files: " + files.toString());
       }
       Comparator<File> comparator = getComparator();
       if (comparator != null)
       {
           Collections.sort(files, comparator);
       }
       for (File file : files)
       {
	    ..................

  從上述代碼能夠看出File Connector是輪詢指定文件夾下的全部文件,逐一送到FileTransformer進行處理,可是這樣作存在一個問題,因爲每一個ReadContentTask是從Blocking Queue從讀取"EOF"來判斷是否文件解析結束,然而在多個文件的狀況下,可能出現PutContentTask解析完一個文件後,繼續解析後續送來的Input Stream,將新的文件內容放入Blocking Queue中,從而致使ReadContentTask從Blocking Queue中繼續讀取非"EOF"的內容,再進行處理。
     這樣就出現了多個文件的內容混雜在一塊兒被處理的狀況,若是這些被處理的文件存在先後關聯,這樣的混雜處理就會出現邏輯錯誤。所以咱們須要將File Connector每次輪詢的文件個數縮減到1個。ide

  網上的文章有推薦重載FileMessageReceiver的poll方法,但實際一試,發現這方法並不現實,由於poll方法引用了不少FileMessageReceiver的private的方法和屬性,若是真要重載poll方法,須要在子類從新定義這些方法和屬性,增長了開發工做量。因爲poll方法是先引用listFiles方法,先掃描指定文件夾下的全部文件,放入列表,傳給poll方法,poll方法再遍歷文件列表,進行處理,而listFiles方法實際是調用basicListFiles方法進行文件夾文件掃描的,所以咱們選擇了重載basicListFiles方法,使得每次輪詢時,送給poll方法的文件列表只包含了單個文件,這樣就實現了單文件輪詢的要求。ui

    咱們自定義了FileMessageReceiver重載basicListFiles方法this

@Override
	protected void basicListFiles(File currentDirectory, List<File> discoveredFiles)
    {	
		File[] files;
        Filter filter = endpoint.getFilter(); 
      //Filter the file or directory according to setting filter conditions.
        if ( filter instanceof FileFilter)
        {
            files = currentDirectory.listFiles((FileFilter)filter);
        }
        else if(filter instanceof FilenameFilter)
        {
            files = currentDirectory.listFiles((FilenameFilter)filter);
        }
        else
        {        	
        	files = currentDirectory.listFiles();
        }

        // the listFiles calls above may actually return null (check the JDK code).
        if (files == null || files.length == 0)
        {        
        	logger.info("No Available Files");
        	logger.info("Process End");
            return;
        }       
        
        List<File> fileList =new ArrayList<File>();        
        scanFiles(currentDirectory, fileList);
        //Sort the scanned file list according to specified comparator.
        if(fileList.size() > 0)
        {
        	 //Compare all the scanned files.
    	    if(fileList.size() >1)
    	    {
    	       		Comparator<File> comparator = null;
    				try 
    				{
    					comparator = getComparator();
    					if (comparator != null)
    		            {
    		                 Collections.sort(fileList, comparator);
    		            }
    				} 
    				catch (Exception e) 
    				{
    					logger.error(ExceptionUtils.getFullStackTrace(e));
    				} 
    	       }
           
           discoveredFiles.clear();       
           //Add the required number's files.
           for(int i=0; i<1; i++)
           {
        	   discoveredFiles.add(fileList.get(i));
           }
        }
    }     
	
	/**
	 * This method accesses the directory recursively, gets all the files under this directory. 
	 * @param currentDirectory
	 * The special file directory.
	 * @param fileList
	 * The scanned file list.
	 */
	private void scanFiles(File currentDirectory, List<File> fileList)
	{
		File[] files;
		Filter filter = endpoint.getFilter();
		//Filter the file or directory according to setting filter conditions.
		if (filter instanceof FileFilter)
		{
			files = currentDirectory.listFiles((FileFilter)filter);
		}
		else if(filter instanceof FilenameFilter)
		{
			files = currentDirectory.listFiles((FilenameFilter)filter);
		}
		else
		{        	
			files = currentDirectory.listFiles();
		}
		
		if (files == null || files.length == 0)
		{
			return;
		}
	   
		for(File file:files)
		{
			if (!file.isDirectory())
			{
				fileList.add(file);                
			}
			else
			{
				if (((FileConnector)connector).isRecursive())
				{
					this.scanFiles(file, fileList);                   
				}
			}
		}  
    }

    咱們定義了循環迭代訪問文件夾,獲取文件夾下全部文件的方法scanFiles,basicListFiles方法即調用這個方法獲取輪詢文件夾下的全部文件列表。spa

    咱們是否能夠直接從這個文件列表獲取單個文件列表discoveredFiles,返回給poll方法?回答是否認的,由於這個文件列表尚未通過排序。在默認的FileMessageReceiver中,因爲basicListFiles方法輪詢文件夾下全部文件返回給poll方法,由poll方法排序後再進行文件讀取。線程

@Override
    public void poll()
    {
      .........
      Comparator<File> comparator = getComparator();
      if (comparator != null)
      {
          Collections.sort(files, comparator);
      }

    然而因爲咱們返回的是的是單個文件列表,poll方法裏的排序將不會有效果,因此咱們必須在重載的basicListFiles方法中添加相應的排序代碼(上面代碼已經列出,再也不贅述)debug

   在自定義FileMessageReceiver類後,咱們還須要讓流程中的File inbound endpoint使用咱們定義的FileMessageReceiver類。咱們對流程文件作下列修改:日誌

<!--自定義全局File Connector,使用自定義的FileMessageReceiver類覆蓋了默認的FileMessageReceiver類 -->
<file:connector name="input" doc:name="File" recursive="true" > 
    <service-overrides messageReceiver="fileconnectortest.CustomFileMessageReceiver"/> 
</file:connector>

<flow name="fileconnectortestFlow" >
    <!--file inbound endpoint引用自定義的File Connector,使用了自定義的FileMessageReceiver類 -->
    <file:inbound-endpoint path="D:\connectorTest" responseTimeout="10000" doc:name="File" 
      connector-ref="input"/>
</flow>

    最後還有一個問題須要解決,因爲FileMessageReceiver和FileTransformer分別是不一樣的線程執行,當FileTransformer解析處理完前一個文件流時,如何通知FileMessageReceiver輪詢下一個文件?咱們使用了設置Mule上下文環境變量的辦法。  code

    1. 在流程文件中定義一個Mule環境變量processCompleteFlag,初始值爲trueorm

<global-property name="processCompleteFlag" value="true" doc:name="Global Property"/>

     2.修改重載的basicListFiles方法,加入環境變量檢查的代碼。   

@Override
	protected void basicListFiles(File currentDirectory, List<File> discoveredFiles)
    {	
		MuleContext muleContext = this.getFlowConstruct().getMuleContext();
		String processFlag = 
				muleContext.getRegistry().get("processCompleteFlag");
		
		if(processFlag.equals("false"))
		{
			logger.info("Not Processed");
			return;
		}
        .................
        if (files == null || files.length == 0)
        {    
        	try 
        	{
        		muleContext.getRegistry().unregisterObject("processCompleteFlag");
				muleContext.getRegistry().registerObject("processCompleteFlag", "true");
			} 
        	catch (RegistrationException e) 
        	{
        		logger.error(ExceptionUtils.getFullStackTrace(e));
			}
        	
        	logger.info("No Available Files");
        	logger.info("Process End");
            return;
        }       
        .........
        if(fileList.size() > 0)
        {
           ..........
        }
        else
        {
           try 
        	{
        		muleContext.getRegistry().unregisterObject("processCompleteFlag");
				muleContext.getRegistry().registerObject("processCompleteFlag", "true");
			} 
        	catch (RegistrationException e) 
        	{
        		logger.error(ExceptionUtils.getFullStackTrace(e));
			}
        	logger.info("No Available Files");
        	logger.info("Process End");
        }

     basicListFiles方法在輪詢指定文件夾前,會先讀取processCompleteFlag環境變量,若是變量爲false,說明FileTransformer在處理文件中,沒必要對文件夾進行輪詢。若是值爲true,則說明沒有文件在被處理中,能夠對文件夾進行輪詢。 若是輪詢過程當中出現輪詢文件夾爲空或者輪詢文件夾下只有文件夾,沒有文件,則返回空文件列表給poll方法,而且設置processCompleteFlag爲true,以便進行下一次輪詢。

    3.修改FileTransformer的transformMessage方法,添加設置processCompleteFlag的代碼。  

@Override
public Object transformMessage(MuleMessage message, 
       String outputEncoding) throws TransformerException
{
   logger.info("Process File");		
   try 
   {
	 muleContext.getRegistry().unregisterObject("processCompleteFlag");
	 muleContext.getRegistry().registerObject("processCompleteFlag", "false");
   } 
   catch (RegistrationException e) {
	 logger.error(ExceptionUtils.getFullStackTrace(e));
   }	 
   .....
   long endTime = System.currentTimeMillis();
   long elaspeTime = endTime - startTime; 
    	
   try 
   {
   		muleContext.getRegistry().unregisterObject("processCompleteFlag");
		muleContext.getRegistry().registerObject("processCompleteFlag", "true");
		logger.info("Set Complete Flag To True");
   } 
   catch (RegistrationException e) 
   {
		logger.error(ExceptionUtils.getFullStackTrace(e));
   }

     FileTransformer在處理文件流以前,先設置processCompleteFlag爲false,避免FileMessageReceiver沒必要要的輪詢。在當前文件流處理完畢後(全部ReadContentTask都結束任務後),再設置processCompleteFlag爲true,觸發FileMessageReceiver從新開始輪詢。

    咱們仍然使用系列(1)中使用過的students.csv文件,更名爲students1.csv,再建立一個名叫students2.csv的文件,學號從20160501開始,包含500個學生的信息。咱們把這兩個文件拷貝到輪詢目錄D:\connectorTest下。

    

啓動修改後的ESB項目,運行結果以下:

Console窗口輸出以下:

打開對應的日誌文件([Mule Workspace Directory]/.mule/logs/fileconnectortest.log文件)

從日誌文件能夠看出,FileMessageReceiver先輪詢了students1.csv文件,直到FileTransformer類處理完students1.csv文件,設置Flag變量爲true後纔開始輪詢students2.csv文件,證實咱們實現了File Connector一次輪詢一個文件的操做。

相關文章
相關標籤/搜索