官方文檔:http://activemq.apache.org/message-features.htmlhtml
文檔中引用其餘網頁內容!
java
方案數據庫 |
使用場景apache |
優勢緩存 |
缺點服務器 |
消息發送session |
小文件傳輸(文件轉換爲byte[],而後安裝正常消息傳送)tcp |
小文件簡單方便url |
傳輸大文件效率低下spa |
JMS Stream(棄用) |
大文件傳輸(4.2版本前的用戶) |
以流的形式傳輸,解決大文件傳輸的問題 |
官方文檔已經棄用該方案。推薦BlobMessage |
自定義文件中轉 |
大文件傳輸 |
不須要使用broker來傳輸文件,節省資源,效率高 |
須要獨立FTP或者File服務器,且要處理複雜的io等方面的問題。 |
BlobMessage(推薦) |
大文件傳輸 |
ActiveMQ封裝複雜過程,提供方便的接口調用,內置Jetty提供httpServer,方便簡單效率高 |
支持file,http,ftp三種方式,內置httpserver,若是使用file方式須要自行搭建。 |
消息發送有以下幾種方法:
1.做爲消息發送,先讀取全部的文件成byte[],而後使用ByteMessage,把文件數據發送到broker,像正常的message同樣處理。只適合小文件發送。
2.JMS Stream
e.g. ActiveMQConnection connection = ...; Destination destination = new ActiveMQQueue("FOO.BAR"); OutputStream out = connection.createOutputStream(destination); // write the file to out out.close(); Or to consume a large message ActiveMQConnection connection = ...; Destination destination = new ActiveMQQueue("FOO.BAR"); InputStream in = connection.createInputStream(destination) // read the stream... in.close();
發送端拿到文件後,首先分片,默認64K文件數據爲一個byte message,而後依次把全部的message發送到broker,broker轉發給接收端,最後發送一個空消息做爲結束符。
connection上提供了兩個建立OutputStream的方法,一個是createOutputStream建立的是持久化的消息集合,這 些數據會寫到磁盤或是數據庫(對大文件來講慢消費也是一件可怕的事兒);一個是createNonPersistOutputStream建立的是非持久 化消息集合,不會寫到磁盤上,若是沒有及時消費掉就慘了。
文件片斷的byte message的TTL設置爲0,就是不會超時進入DLQ。
適合小文件傳輸,特別是小於片(64k)大小的文件傳輸。
3. ActiveMQ把上面繁複的文件處理工做進行了封裝,屏蔽掉文件中轉的整個處理過程,使得咱們可使用相似jms規範的API來簡單操做文件傳輸。
發送端:
3.1
啓動ActiveMQ時,也啓動jetty(即activemq.xml中有import jetty.xml),此時jetty中運行了一個ActiveMQ自帶的http文件服務器
3.2
tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/建立connection,而後建立session和producer
3.3
使用以下代碼發送文件:
BlobMessageblobMessage = session.createBlobMessage(file); blobMessage.setStringProperty("FILE.NAME",file.getName()); blobMessage.setLongProperty("FILE.SIZE",file.length()); producer.send(blobMessage);
接收端:
InputStream inputStream = blobMessage.getInputStream();
而後直接讀取文件數據便可。文件名和文件大小能夠從message的屬性中拿到。
3.4過程講解:
發送端:producer.send的時候,把文件經過http協議的PUT方法發到jetty中的fileserver(默認128K走http的chunk分片傳輸)。而後把http的url寫入消息中。再把消息發送到broker。
接收端:接收到消息之後,發現是BlobMessage,拿到url,直接使用GET方法獲取文件數據。處理完畢後,使用DELETE方法從fileserver刪除文件。
3.5 BlobMessage支持3種文件中轉方式:
FILE
要求client和broker在同一個機器或者使用同一個共享存儲。發送文件的時候,把文件從本地寫入到指定路徑。接收文件的時候,把文件今後路徑讀出來。
HTTP
使用http的fileserver,PUT/GET/DELETE方法。ActiveMQ自帶了簡單的實現。就是前面場景中使用的方式。
FTP
使用一個獨立的ftpserver做爲文件中轉方式。發送文件的時候,把文件發送到ftp服務器。接收文件的時候,從ftp把文件讀取下來。
3.6 ActiveMQ 對大文件進行傳輸的時候,有四種方式:
1. BlobUploader:該方法採用內置的Jetty,有個HttpServer服務器
2. FTPBlobDownloadStratrgy:該方法須要自行搭建FTP服務器。
3. FileSystemBlobStrategy:該方法須要搭建文件服務器。
4. DefaultBlobDownloadStrategy:該方法採用缺省方式。
3.7 內部原理
發送端:producer.send的時候,把文件經過http協議的PUT方法發到jetty中的fileserver(默認128K走http的chunk分片傳輸)。而後把http的url寫入消息中。再把消息發送到broker。
接收端:接收到消息之後,發現是BlobMessage,拿到url,直接使用GET方法獲取文件數據。處理完畢後,使用DELETE方法從fileserver刪除文件。
文件發送:
import java.io.File; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.swing.JFileChooser; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; public class FileSender { public static void main(String[] args) { File file = getFile(); // 獲取 ConnectionFactory // Activemq內置Http服務器(Jetty內置) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://10.11.116.21:61616?jms.blobTransferPolicy.defaultUploadUrl=http://10.11.116.21:8161/fileserver/"); MessageProducer producer = null; ActiveMQSession session = null; Connection connection = null; try { // 建立 Connection connection = connectionFactory.createConnection(); connection.start(); // 建立 Session session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立 Destination Destination destination = session.createQueue("File.Transport"); // 建立 Producer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 設置爲持久性 // 設置持久性的話,文件也能夠先緩存下來,接收端離線再鏈接也能夠收到文件 // 構造 BlobMessage,用來傳輸文件 BlobMessage blobMessage = session.createBlobMessage(file); // 經過set方法對對象屬性進行賦值 blobMessage.setStringProperty("FILE.NAME", file.getName()); blobMessage.setLongProperty("FILE.SIZE", file.length()); System.out.println("開始發送文件:" + file.getName() + ",文件大小:" + file.length() + " 字節"); // 7. 發送文件 producer.send(blobMessage); System.out.println("完成文件發送:" + file.getName()); } catch (Exception e) { e.printStackTrace(); } finally { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } try { session.close(); } catch (JMSException e) { e.printStackTrace(); } try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } private static File getFile() { // 選擇要上傳的文件 JFileChooser fileChooser = new JFileChooser(); fileChooser.setDialogTitle("請選擇要傳送的文件"); if (fileChooser.showOpenDialog(null) != JFileChooser.APPROVE_OPTION) { return null; } File file = fileChooser.getSelectedFile(); return file; } }
文件接收:
import java.io.File; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.swing.JFileChooser; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; public class FileSender { public static void main(String[] args) { File file = getFile(); // 獲取 ConnectionFactory // Activemq內置Http服務器(Jetty內置) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://10.11.116.21:61616?jms.blobTransferPolicy.defaultUploadUrl=http://10.11.116.21:8161/fileserver/"); MessageProducer producer = null; ActiveMQSession session = null; Connection connection = null; try { // 建立 Connection connection = connectionFactory.createConnection(); connection.start(); // 建立 Session session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立 Destination Destination destination = session.createQueue("File.Transport"); // 建立 Producer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 設置爲持久性 // 設置持久性的話,文件也能夠先緩存下來,接收端離線再鏈接也能夠收到文件 // 構造 BlobMessage,用來傳輸文件 BlobMessage blobMessage = session.createBlobMessage(file); // 經過set方法對對象屬性進行賦值 blobMessage.setStringProperty("FILE.NAME", file.getName()); blobMessage.setLongProperty("FILE.SIZE", file.length()); System.out.println("開始發送文件:" + file.getName() + ",文件大小:" + file.length() + " 字節"); // 7. 發送文件 producer.send(blobMessage); System.out.println("完成文件發送:" + file.getName()); } catch (Exception e) { e.printStackTrace(); } finally { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } try { session.close(); } catch (JMSException e) { e.printStackTrace(); } try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } private static File getFile() { // 選擇要上傳的文件 JFileChooser fileChooser = new JFileChooser(); fileChooser.setDialogTitle("請選擇要傳送的文件"); if (fileChooser.showOpenDialog(null) != JFileChooser.APPROVE_OPTION) { return null; } File file = fileChooser.getSelectedFile(); return file; } }