ActiveMQ 文件傳輸

官方文檔:http://activemq.apache.org/message-features.htmlhtml

文檔中引用其餘網頁內容!
java


方案數據庫

使用場景apache

優勢緩存

缺點服務器

消息發送session

小文件傳輸(文件轉換爲byte[],而後安裝正常消息傳送)tcp

小文件簡單方便url

傳輸大文件效率低下spa

JMS Stream(棄用)

大文件傳輸(4.2版本前的用戶)

以流的形式傳輸,解決大文件傳輸的問題

官方文檔已經棄用該方案。推薦BlobMessage

自定義文件中轉

大文件傳輸

不須要使用broker來傳輸文件,節省資源,效率高

須要獨立FTP或者File服務器,且要處理複雜的io等方面的問題。

BlobMessage(推薦)

大文件傳輸

ActiveMQ封裝複雜過程,提供方便的接口調用,內置Jetty提供httpServer,方便簡單效率高

支持filehttpftp三種方式,內置httpserver,若是使用file方式須要自行搭建。

消息發送有以下幾種方法:

1.做爲消息發送,先讀取全部的文件成byte[],而後使用ByteMessage,把文件數據發送到broker,像正常的message同樣處理。只適合小文件發送。

2.JMS Stream

e.g.
ActiveMQConnection connection = ...;
Destination destination = new ActiveMQQueue("FOO.BAR");
OutputStream out = connection.createOutputStream(destination);
// write the file to out
out.close();

Or to consume a large message
ActiveMQConnection connection = ...;
Destination destination = new ActiveMQQueue("FOO.BAR");
InputStream in = connection.createInputStream(destination)
// read the stream...
in.close();

發送端拿到文件後,首先分片,默認64K文件數據爲一個byte message,而後依次把全部的message發送到brokerbroker轉發給接收端,最後發送一個空消息做爲結束符。

connection上提供了兩個建立OutputStream的方法,一個是createOutputStream建立的是持久化的消息集合,這 些數據會寫到磁盤或是數據庫(對大文件來講慢消費也是一件可怕的事兒);一個是createNonPersistOutputStream建立的是非持久 化消息集合,不會寫到磁盤上,若是沒有及時消費掉就慘了。

文件片斷的byte messageTTL設置爲0,就是不會超時進入DLQ

適合小文件傳輸,特別是小於片(64k)大小的文件傳輸。

3. ActiveMQ把上面繁複的文件處理工做進行了封裝,屏蔽掉文件中轉的整個處理過程,使得咱們可使用相似jms規範的API來簡單操做文件傳輸。

發送端:

3.1

啓動ActiveMQ時,也啓動jetty(activemq.xml中有import jetty.xml),此時jetty中運行了一個ActiveMQ自帶的http文件服務器

3.2       

tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/建立connection,而後建立sessionproducer

3.3 

使用以下代碼發送文件:

BlobMessageblobMessage = session.createBlobMessage(file); 
blobMessage.setStringProperty("FILE.NAME",file.getName()); 
blobMessage.setLongProperty("FILE.SIZE",file.length()); 
producer.send(blobMessage);

接收端:

InputStream inputStream = blobMessage.getInputStream();

而後直接讀取文件數據便可。文件名和文件大小能夠從message的屬性中拿到。

3.4過程講解:

發送端:producer.send的時候,把文件經過http協議的PUT方法發到jetty中的fileserver(默認128Khttpchunk分片傳輸)。而後把httpurl寫入消息中。再把消息發送到broker

接收端:接收到消息之後,發現是BlobMessage,拿到url,直接使用GET方法獲取文件數據。處理完畢後,使用DELETE方法從fileserver刪除文件。

3.5 BlobMessage支持3種文件中轉方式:

FILE

要求clientbroker在同一個機器或者使用同一個共享存儲。發送文件的時候,把文件從本地寫入到指定路徑。接收文件的時候,把文件今後路徑讀出來。

HTTP

使用httpfileserverPUT/GET/DELETE方法。ActiveMQ自帶了簡單的實現。就是前面場景中使用的方式。

FTP

使用一個獨立的ftpserver做爲文件中轉方式。發送文件的時候,把文件發送到ftp服務器。接收文件的時候,從ftp把文件讀取下來。

3.6 ActiveMQ 對大文件進行傳輸的時候,有四種方式:

1. BlobUploader:該方法採用內置的Jetty,有個HttpServer服務器

2. FTPBlobDownloadStratrgy:該方法須要自行搭建FTP服務器。

3. FileSystemBlobStrategy:該方法須要搭建文件服務器。

4. DefaultBlobDownloadStrategy:該方法採用缺省方式。

3.7 內部原理

發送端:producer.send的時候,把文件經過http協議的PUT方法發到jetty中的fileserver(默認128Khttpchunk分片傳輸)。而後把httpurl寫入消息中。再把消息發送到broker

接收端:接收到消息之後,發現是BlobMessage,拿到url,直接使用GET方法獲取文件數據。處理完畢後,使用DELETE方法從fileserver刪除文件。

文件發送:

import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.swing.JFileChooser;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;

public class FileSender {

	public static void main(String[] args) {
		File file = getFile();
		// 獲取 ConnectionFactory
		// Activemq內置Http服務器(Jetty內置)
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				"tcp://10.11.116.21:61616?jms.blobTransferPolicy.defaultUploadUrl=http://10.11.116.21:8161/fileserver/");
		MessageProducer producer = null;
		ActiveMQSession session = null;
		Connection connection = null;
		try {
			// 建立 Connection
			connection = connectionFactory.createConnection();
			connection.start();
			// 建立 Session
			session = (ActiveMQSession) connection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			// 建立 Destination
			Destination destination = session.createQueue("File.Transport");
			// 建立 Producer
			producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 設置爲持久性
			// 設置持久性的話,文件也能夠先緩存下來,接收端離線再鏈接也能夠收到文件
			// 構造 BlobMessage,用來傳輸文件
			BlobMessage blobMessage = session.createBlobMessage(file);
			// 經過set方法對對象屬性進行賦值
			blobMessage.setStringProperty("FILE.NAME", file.getName());
			blobMessage.setLongProperty("FILE.SIZE", file.length());
			System.out.println("開始發送文件:" + file.getName() + ",文件大小:"
					+ file.length() + " 字節");
			// 7. 發送文件
			producer.send(blobMessage);
			System.out.println("完成文件發送:" + file.getName());
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				producer.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
			try {
				session.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
			try {
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}

	private static File getFile() {
		// 選擇要上傳的文件
		JFileChooser fileChooser = new JFileChooser();
		fileChooser.setDialogTitle("請選擇要傳送的文件");
		if (fileChooser.showOpenDialog(null) != JFileChooser.APPROVE_OPTION) {
			return null;
		}
		File file = fileChooser.getSelectedFile();
		return file;
	}
}

文件接收:

import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.swing.JFileChooser;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
public class FileSender {
	public static void main(String[] args) {
		File file = getFile();
		// 獲取 ConnectionFactory
		// Activemq內置Http服務器(Jetty內置)
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				"tcp://10.11.116.21:61616?jms.blobTransferPolicy.defaultUploadUrl=http://10.11.116.21:8161/fileserver/");
		MessageProducer producer = null;
		ActiveMQSession session = null;
		Connection connection = null;
		try {
			// 建立 Connection
			connection = connectionFactory.createConnection();
			connection.start();
			// 建立 Session
			session = (ActiveMQSession) connection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			// 建立 Destination
			Destination destination = session.createQueue("File.Transport");
			// 建立 Producer
			producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 設置爲持久性
			// 設置持久性的話,文件也能夠先緩存下來,接收端離線再鏈接也能夠收到文件
			// 構造 BlobMessage,用來傳輸文件
			BlobMessage blobMessage = session.createBlobMessage(file);
			// 經過set方法對對象屬性進行賦值
			blobMessage.setStringProperty("FILE.NAME", file.getName());
			blobMessage.setLongProperty("FILE.SIZE", file.length());
			System.out.println("開始發送文件:" + file.getName() + ",文件大小:"
					+ file.length() + " 字節");
			// 7. 發送文件
			producer.send(blobMessage);
			System.out.println("完成文件發送:" + file.getName());
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				producer.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
			try {
				session.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
			try {
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}

	private static File getFile() {
		// 選擇要上傳的文件
		JFileChooser fileChooser = new JFileChooser();
		fileChooser.setDialogTitle("請選擇要傳送的文件");
		if (fileChooser.showOpenDialog(null) != JFileChooser.APPROVE_OPTION) {
			return null;
		}
		File file = fileChooser.getSelectedFile();
		return file;
	}
}
相關文章
相關標籤/搜索