Mule ESB File Connector輪詢單個文件的實現(1)

        正在開發的項目提出了一個要求,要求Mule ESB流程輪詢指定文件夾下的文本文件,逐行讀取文本文件的內容,進行業務處理。java

        咱們考慮使用File Connector,對指定文件夾進行輪詢。File Connector使用org.mule.transport.file.FileMessageReceiver的poll方法,讀取文件夾下的每個文件,將其推送給後續的處理節點。      git

         考慮處處理的文件可能很大,讀取文件和處理文件各行的操做咱們決定分開處理,使用了不一樣的後臺線程進行處理。各個後臺線程共享同一個阻塞隊列,以下圖所示,其中讀取文件的後臺線程是圖中的Thread 1,相似於生產者,它每讀取文件的一行,就將該行放入阻塞隊列中。而處理文件各行的線程是圖中的Thread 2,相似於消費者,它們從阻塞隊列裏讀出一行數據,進行處理。github

        

         讀寫文件的後臺線程咱們統一使用Spring的線程池進行管理。解析文件,將各行數據放入阻塞隊列咱們只使用一個後臺線程,從阻塞隊列裏讀取數據使用5個後臺線程,每一個後臺線程從阻塞隊列裏讀取一行數據,輸出到控制檯。spring

        Mule的File Connector輪詢指定文件夾,讀取文件夾中符合要求的文件,將其轉換爲InputStream(實際類型是org.mule.transport.file.ReceiverFileInputStream),傳送到File Parser節點。app

       File Parser節點使用自定義的FileTransformer類接收傳送的InputStream,進行文件讀寫。FileTransformer的代碼以下      ide

public class FileTransformer extends AbstractMessageTransformer {

	//The blocking queue used to process file content.
	@Resource
	private ArrayBlockingQueue<String> contentQueue;	
	
	//The thread pool executor to provide threads to process file content.
	@Autowired
	private ThreadPoolTaskExecutor queueExecutor;
	
	private Logger logger = LogManager.getLogger(FileTransformer.class);
	
	
	@Override
	public Object transformMessage(MuleMessage message, String outputEncoding) throws TransformerException {
		
		long startTime = System.currentTimeMillis();
		
		//The file input stream sent by file connector.
		InputStream inputStream = 
				(InputStream)message.getPayload();
		
		PutContentTask putContentTask =
    			new PutContentTask(contentQueue, inputStream);
    	queueExecutor.execute(putContentTask);  
    	
    	CountDownLatch threadSignal = new CountDownLatch(5);
    	
    	for(int i=0; i<5;i++)
    	{
    		String taskName = "ReadContentTask " + i;
    		ReadContentTask readContentTask =
    				new ReadContentTask(contentQueue, taskName, threadSignal);
    		queueExecutor.execute(readContentTask);
    	}
    	
    	try 
    	{
			threadSignal.await();			
			contentQueue.clear();
		} 
    	catch (Exception e) {
			logger.error(ExceptionUtils.getFullStackTrace(e));
		} 
    	
    	long endTime = System.currentTimeMillis();
    	long elaspeTime = endTime - startTime;    	
		
    	return "The elapse time is:" + elaspeTime;
	}

}

 FileTransformer類中注入了一個Blocking Queue對象,用於存放掃描文件時得到的每一行文本,這個對象定義在Spring配置文件中。測試

<!--src/main/app/conf/applicationContext.xml-->
<bean id="contentQueue" class="java.util.concurrent.ArrayBlockingQueue">
		<constructor-arg value="1000"/>
		<constructor-arg value="true"/>
</bean>

FileTransformer類中還注入了ThreadPoolTaskExecutor對象,用於發送解析文件和處理文件內容的Task。它也定義在Spring配置文件裏。線程

<!--src/main/app/conf/applicationContext.xml-->

<bean id="queueExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="10" />
        <property name="maxPoolSize" value="20" />
        <property name="queueCapacity" value="200" />
        <property name="keepAliveSeconds" value="20000" />
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy" />
        </property>
</bean>

在transformMessage方法中,當File Connector推送文件Input Stream過來時,咱們構建一個PutContentTask,用於解析文件流將文件的各行文本數據放入Blocking Queue中。PutContentTask類的主要代碼以下:code

public class PutContentTask implements Runnable {
	//This blocking queue is used to contain each line of file.
	private ArrayBlockingQueue<String> contentQueue = null;
	//The file input stream.
	private InputStream inputStream;

....................
@Override
	public void run() 
	{
			try 
			{						
				Scanner scanner = new Scanner(inputStream);
				while(scanner.hasNext())
				{
					String line = scanner.nextLine();
					contentQueue.put(line);				
				}
				//Put EOF string to line to notify read task that we reach the end of file.
				contentQueue.put("EOF");
				scanner.close();
			} catch (InterruptedException e) {
				logger.error(ExceptionUtils.getFullStackTrace(e));
			}
			finally
			{
				//Close the file input stream so that mule can auto delete the file.
				if(inputStream != null)
				{
					try {
						inputStream.close();
					} catch (IOException e) {
						logger.error(ExceptionUtils.getFullStackTrace(e));
					}
				}						
			}
		}

PutContentTask在Input Stream解析結束後,會向Blocking Queue中放一個"EOF"標誌,表示文件已經解析完成。讀取隊列數據的ReadContentTask在讀取到這個標識後,會結束讀取操做。orm

在將Input Stream交給PutContentTask去解析後,transformMessage方法建立了5個ReadContentTask,從Blocking Queue中讀取ReadContentTask解析出的文件的每一行並作處理(我這個程序只是將讀取的每一行輸出到控制檯,讀者能夠根據本身的需求編寫相應的業務邏輯)。五個ReadContentTask使用CountDownLatch進行同步,等CountDownLatch計數爲0時,File Connector推送的文件Input Stream解析和處理結束,統計總共的處理時間。

ReadContentTask類的主要代碼以下:

public class ReadContentTask implements Runnable {

  //This blocking queue is used to contain each line of file.
  private ArrayBlockingQueue<String> contentQueue = null;	
  //The task name of read content task(It identifies each read task).
  private String taskName;	
  //The synchronous signal variable for each read task.
  private CountDownLatch signal = null;	
  private Logger logger = LogManager.getLogger(ReadContentTask.class); 

  @Override
  public void run() 
  {
		while(true)
		{				
				if(!contentQueue.isEmpty())
				{
					try 
					{
						String line = contentQueue.take();
						if(line!=null && !line.equals("") && !line.equals("EOF"))
						{
							//process each line.
							System.out.println(taskName + " Line is:" + line);
						}
						else
						{
							//Notify other read task.
							contentQueue.put("EOF");
							break;
						}
					} catch (InterruptedException e) {
							logger.error(ExceptionUtils.getFullStackTrace(e));
					}
				}
		}
		signal.countDown();
		
	}

對於每一個ReadContentTask,從Blocking Queue中讀取到"EOF"後,即視爲文件已經解析完成,結束處理任務,同時向contentQueue中放入"EOF",通知其餘ReadContentTask結束處理。

當全部ReadContentTask都結束處理後,transformMessage方法將清空Blocking Queue,等待下一次文件解析和處理。

try 
  {
     threadSignal.await();			
	 contentQueue.clear();
  } 
  catch (Exception e) {
		logger.error(ExceptionUtils.getFullStackTrace(e));
  }

爲了顯示文件的解析和處理時間,FileTransformer類將計算出的elapse time放入mule message的payload中,咱們添加一個Logger對象顯示處理時間。

測試的時候,咱們建立一個簡單的csv文件students.csv(以下圖所示),記錄500個學生的學號,姓名和所屬班級,放到File Connector監控的connectorTest目錄下。

啓動ESB項目,能夠看到students.csv文件被解析,文件的每行被不一樣的ReadContentTask輸出到控制檯,處理的時間是31ms

項目的代碼上傳到了github上,地址是

https://github.com/yosaku01/fileconnectortest

相關文章
相關標籤/搜索