本文討論ActiveMQ傳輸文件的幾種方法的原理及其利弊,做爲消息發送、直接傳輸文件、使用ftp或http中轉。最後介紹擴展ActiveMQ實現自定義文件傳輸方式,討論如何實現高效的文件傳輸。by kimmkinghtml
按照JMS規範,爲了保證可靠性,全部的消息都應該是發送到broker,而後交由broker來投遞的。也便是說其實JMS是不建議或不支持傳輸文件的。數據庫
對於比較小的文件,簡單的處理方式是先讀取全部的文件成byte[],而後使用ByteMessage,把文件數據發送到broker,像正常的message同樣處理。對於大文件,例如1GB以上的文件,這麼搞直接把client或是broker給oom掉了。apache
這種方式僅僅適用於小文件的傳輸。特別是若是broker端使用數據庫做爲存儲,message序列化之後存放於blob字段,文件傳輸頻繁或是稍微有點大,寫入效率極低。服務器
爲了解決傳輸大文件的問題,ActiveMQ在jms規範以外引入了jms streams的概念。PTP模式下,連到同一個destination的兩端,能夠經過broker中轉來傳輸大文件。session
發送端使用connection.createOutputStream打開一個輸出流,往流裏寫文件。tcp
OutputStream out =connection.createOutputStream(destination);大數據
接收端則簡單的使用connection.createInputStream拿到一個輸入流,從中讀取文件數據便可。url
InputStream in = connection.createInputStream(destination)
詳見:http://activemq.apache.org/jms-streams.htmlspa
使用很是簡單。ActiveMQ在中間作了什麼事情呢?代理
其實過程蠻曲折的,發送端拿到文件後,首先分片,默認64K文件數據爲一個byte message,而後依次把全部的message發送到broker,broker轉發給接收端,最後發送一個空消息做爲結束符。
connection上提供了兩個建立OutputStream的方法,一個是createOutputStream建立的是持久化的消息集合,這些數據會寫到磁盤或是數據庫(對大文件來講慢消費也是一件可怕的事兒);一個是createNonPersistOutputStream建立的是非持久化消息集合,不會寫到磁盤上,若是沒有及時消費掉就慘了。
文件片斷的byte message的TTL設置爲0,就是不會超時進入DLQ。
優點:簡單直接,處理很是小(不大於64K)的文件很是方便。
劣勢:對大文件,簡直就是噩夢。
使用消息的方式來傳遞大文件,明顯不是一個有效率的辦法。文件應該就是按文件的方式去處理。
若是本身處理文件的話,一個簡單方式是使用共享或ftp、dfs等方式,先把文件發送到一個你們均可以拿到的地方,而後發送message,payload或properties中包含文件的路徑信息。這樣,consumer拿到文件路徑後去指定的地方,按照給定的方式去獲取文件數據便可。
優點:這種方式能夠用來處理大數據,而且不須要client或broker在內存中持有文件數據自己,很是的節省資源。並且文件是經過額外的方式處理,跟ActiveMQ自己無關,因此符合jms協議、處理的效率也相對比較高。
劣勢:須要本身處理不少文件相關的操做。
幸運的是,ActiveMQ把上面繁複的文件處理工做進行了封裝,屏蔽掉文件中轉的整個處理過程,使得咱們可使用相似jms規範的API來簡單操做文件傳輸。
舉個例子來講,典型的使用步驟:
發送端:
1. 啓動ActiveMQ時,也啓動jetty(即activemq.xml中有import jetty.xml),此時jetty中運行了一個ActiveMQ自帶的http文件服務器
2. 使用tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/建立connection,而後建立session和producer
3. 使用以下代碼發送文件:
BlobMessageblobMessage = session.createBlobMessage(file);
blobMessage.setStringProperty("FILE.NAME",file.getName());
blobMessage.setLongProperty("FILE.SIZE",file.length());
producer.send(blobMessage);
接收端比較簡單,正常的使用jms接收到消息:
InputStream inputStream = blobMessage.getInputStream();
而後直接讀取文件數據便可。文件名和文件大小能夠從message的屬性中拿到。
這個過程當中ActiveMQ作了什麼呢?
發送端:producer.send的時候,把文件經過http協議的PUT方法發到jetty中的fileserver(默認128K走http的chunk分片傳輸)。而後把http的url寫入消息中。再把消息發送到broker。
接收端:接收到消息之後,發現是BlobMessage,拿到url,直接使用GET方法獲取文件數據。處理完畢後,使用DELETE方法從fileserver刪除文件。
BlobMessage支持3種文件中轉方式:
FILE
要求client和broker在同一個機器或者使用同一個共享存儲。發送文件的時候,把文件從本地寫入到指定路徑。接收文件的時候,把文件今後路徑讀出來。
HTTP
使用http的fileserver,PUT/GET/DELETE方法。ActiveMQ自帶了簡單的實現。就是前面場景中使用的方式。
FTP
使用一個獨立的ftpserver做爲文件中轉方式。發送文件的時候,把文件發送到ftp服務器。接收文件的時候,從ftp把文件讀取下來。
詳見:http://activemq.apache.org/blob-messages.html
優點:消息處理與文件處理傳輸分開,極大的提升了文件傳輸的效率。並且可使用相似jms協議的方式來處理文件發送。
劣勢:FILE方式不太實用。HTTP和FTP方式都須要額外的fileserver。
ActiveMQ實現BlobMessage的三種文件中轉方式時,使用了Façade和Strategy模式。ActiveMQBlobMessage須要在發送消息時使用blobUploader上傳文件、接收消息時使用blobDownloader下載文件。每種中轉方式的這兩個操做分別使用BlobUploadStrategy和BlobDownloadStrategy封裝。
因此,咱們也能夠根據ActiveMQBlobMessage文件傳送的原理,實現本身的定製方式:
一、 給ActiveMQBlobMessage添加本身的blobDownloader和blobUploader來實現文件的處理。
二、 擴展這個文件中起色制,實現BlobUploadStrategy和BlobDownloadStrategy。
一個更高效且不須要額外fileserver的實現思路是:broker上再打開一個tcp的監聽端口,用來接收和轉發文件,當發送和接收端都在時,broker僅僅做爲一個傳輸代理。接收端不在時,broker把數據存爲本地臨時文件,處理完畢後刪除掉。之因此使用一個新的端口來傳輸文件數據而不是已有的transport,是爲了不jms streams這種命令和數據混合的模式。能夠參考ftp協議,做爲一種高效的文件傳輸協議,它有一個很大的特色就是命令的處理,和數據傳輸的處理使用不一樣的端口和鏈接。而ActiveMQ使用的openwire協議其實就是一個個的操做命令。文件分片、包裝、序列化,到另外一頭再反向這個過程,無疑是效率很低下的