正在開發的項目提出了一個要求,要求Mule ESB流程輪詢指定文件夾下的文本文件,逐行讀取文本文件的內容,進行業務處理。java
咱們考慮使用File Connector,對指定文件夾進行輪詢。File Connector使用org.mule.transport.file.FileMessageReceiver的poll方法,讀取文件夾下的每個文件,將其推送給後續的處理節點。 git
考慮處處理的文件可能很大,讀取文件和處理文件各行的操做咱們決定分開處理,使用了不一樣的後臺線程進行處理。各個後臺線程共享同一個阻塞隊列,以下圖所示,其中讀取文件的後臺線程是圖中的Thread 1,相似於生產者,它每讀取文件的一行,就將該行放入阻塞隊列中。而處理文件各行的線程是圖中的Thread 2,相似於消費者,它們從阻塞隊列裏讀出一行數據,進行處理。github
讀寫文件的後臺線程咱們統一使用Spring的線程池進行管理。解析文件,將各行數據放入阻塞隊列咱們只使用一個後臺線程,從阻塞隊列裏讀取數據使用5個後臺線程,每一個後臺線程從阻塞隊列裏讀取一行數據,輸出到控制檯。spring
Mule的File Connector輪詢指定文件夾,讀取文件夾中符合要求的文件,將其轉換爲InputStream(實際類型是org.mule.transport.file.ReceiverFileInputStream),傳送到File Parser節點。app
File Parser節點使用自定義的FileTransformer類接收傳送的InputStream,進行文件讀寫。FileTransformer的代碼以下 ide
public class FileTransformer extends AbstractMessageTransformer { //The blocking queue used to process file content. @Resource private ArrayBlockingQueue<String> contentQueue; //The thread pool executor to provide threads to process file content. @Autowired private ThreadPoolTaskExecutor queueExecutor; private Logger logger = LogManager.getLogger(FileTransformer.class); @Override public Object transformMessage(MuleMessage message, String outputEncoding) throws TransformerException { long startTime = System.currentTimeMillis(); //The file input stream sent by file connector. InputStream inputStream = (InputStream)message.getPayload(); PutContentTask putContentTask = new PutContentTask(contentQueue, inputStream); queueExecutor.execute(putContentTask); CountDownLatch threadSignal = new CountDownLatch(5); for(int i=0; i<5;i++) { String taskName = "ReadContentTask " + i; ReadContentTask readContentTask = new ReadContentTask(contentQueue, taskName, threadSignal); queueExecutor.execute(readContentTask); } try { threadSignal.await(); contentQueue.clear(); } catch (Exception e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } long endTime = System.currentTimeMillis(); long elaspeTime = endTime - startTime; return "The elapse time is:" + elaspeTime; } }
FileTransformer類中注入了一個Blocking Queue對象,用於存放掃描文件時得到的每一行文本,這個對象定義在Spring配置文件中。測試
<!--src/main/app/conf/applicationContext.xml--> <bean id="contentQueue" class="java.util.concurrent.ArrayBlockingQueue"> <constructor-arg value="1000"/> <constructor-arg value="true"/> </bean>
FileTransformer類中還注入了ThreadPoolTaskExecutor對象,用於發送解析文件和處理文件內容的Task。它也定義在Spring配置文件裏。線程
<!--src/main/app/conf/applicationContext.xml--> <bean id="queueExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="10" /> <property name="maxPoolSize" value="20" /> <property name="queueCapacity" value="200" /> <property name="keepAliveSeconds" value="20000" /> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy" /> </property> </bean>
在transformMessage方法中,當File Connector推送文件Input Stream過來時,咱們構建一個PutContentTask,用於解析文件流將文件的各行文本數據放入Blocking Queue中。PutContentTask類的主要代碼以下:code
public class PutContentTask implements Runnable { //This blocking queue is used to contain each line of file. private ArrayBlockingQueue<String> contentQueue = null; //The file input stream. private InputStream inputStream; .................... @Override public void run() { try { Scanner scanner = new Scanner(inputStream); while(scanner.hasNext()) { String line = scanner.nextLine(); contentQueue.put(line); } //Put EOF string to line to notify read task that we reach the end of file. contentQueue.put("EOF"); scanner.close(); } catch (InterruptedException e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } finally { //Close the file input stream so that mule can auto delete the file. if(inputStream != null) { try { inputStream.close(); } catch (IOException e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } } } }
PutContentTask在Input Stream解析結束後,會向Blocking Queue中放一個"EOF"標誌,表示文件已經解析完成。讀取隊列數據的ReadContentTask在讀取到這個標識後,會結束讀取操做。orm
在將Input Stream交給PutContentTask去解析後,transformMessage方法建立了5個ReadContentTask,從Blocking Queue中讀取ReadContentTask解析出的文件的每一行並作處理(我這個程序只是將讀取的每一行輸出到控制檯,讀者能夠根據本身的需求編寫相應的業務邏輯)。五個ReadContentTask使用CountDownLatch進行同步,等CountDownLatch計數爲0時,File Connector推送的文件Input Stream解析和處理結束,統計總共的處理時間。
ReadContentTask類的主要代碼以下:
public class ReadContentTask implements Runnable { //This blocking queue is used to contain each line of file. private ArrayBlockingQueue<String> contentQueue = null; //The task name of read content task(It identifies each read task). private String taskName; //The synchronous signal variable for each read task. private CountDownLatch signal = null; private Logger logger = LogManager.getLogger(ReadContentTask.class); @Override public void run() { while(true) { if(!contentQueue.isEmpty()) { try { String line = contentQueue.take(); if(line!=null && !line.equals("") && !line.equals("EOF")) { //process each line. System.out.println(taskName + " Line is:" + line); } else { //Notify other read task. contentQueue.put("EOF"); break; } } catch (InterruptedException e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } } } signal.countDown(); }
對於每一個ReadContentTask,從Blocking Queue中讀取到"EOF"後,即視爲文件已經解析完成,結束處理任務,同時向contentQueue中放入"EOF",通知其餘ReadContentTask結束處理。
當全部ReadContentTask都結束處理後,transformMessage方法將清空Blocking Queue,等待下一次文件解析和處理。
try { threadSignal.await(); contentQueue.clear(); } catch (Exception e) { logger.error(ExceptionUtils.getFullStackTrace(e)); }
爲了顯示文件的解析和處理時間,FileTransformer類將計算出的elapse time放入mule message的payload中,咱們添加一個Logger對象顯示處理時間。
測試的時候,咱們建立一個簡單的csv文件students.csv(以下圖所示),記錄500個學生的學號,姓名和所屬班級,放到File Connector監控的connectorTest目錄下。
啓動ESB項目,能夠看到students.csv文件被解析,文件的每行被不一樣的ReadContentTask輸出到控制檯,處理的時間是31ms
項目的代碼上傳到了github上,地址是
https://github.com/yosaku01/fileconnectortest