ActiveMQ實現Master-Slave的方式有幾種,一種是使用Jdbc Peristent的方式,未被Consumer讀取的消息和死消息寫在數據庫的ActiveMQ_MSGS表中,還有一種是使用LevelDB文件,這些消息將被存儲在Level DB的.log文件中。java
如今咱們項目有一個需求是將原先使用Jdbc Persistent的ActiveMQ轉換爲使用Zk + Replicated Level DB的ActiveMQ,這就產生了一個問題:如何將原先保存在ActiveMQ_MSGS表的消息導入到新ActiveMQ服務器的LevelDB數據文件中?數據庫
咱們查看了使用jdbc peristent的ACTIVEMQ_MSGS表數據(以MySQL數據庫爲例)windows
能夠看出存儲在數據庫裏的消息是以Blob形式保存的。服務器
使用leveldb形式保存的消息數據的形式是二進制形式的session
咱們能夠看出二者的數據存儲格式徹底不同,所以不可能像jdbc peristent遷移那樣,將activemq_msgs表數據從一個數據庫遷移到另外一個數據庫那樣簡單。只能採用從源ActiveMQ服務器讀取隊列消息,再將隊列消息發送到目標ActiveMQ的方式。架構
咱們搭建了兩個CentOS虛擬機,虛擬機A上是源ActiveMQ,使用jdbc persistent(MySQL),有三個Queue:TestQueue1,TestQueue2,TestQueue3.app
虛擬機B使用LevelDB作數據持久化。dom
1.單ActiveMQ,單隊列消息遷移async
TestQueue1有三條持久化消息tcp
消息的內容咱們可使用activemq-admin.bat查看(這個只在windows環境下可用),這個命令支持獲取遠程ActiveMQ信息。
activemq-admin.bat browse --amqurl tcp://xxx.xxx.xxx.xxx:61616 TestQueue1
得到的TestQueue1隊列信息以下所示
JMS_HEADER_FIELD:JMSDestination = TestQueue1 JMS_BODY_FIELD:JMSText = Test Queue1's first message. JMS_HEADER_FIELD:JMSType = JMS_HEADER_FIELD:JMSCorrelationID = JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:rickhunter.domain-46264-1476266938779-4:1:1:1:1 JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 0 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1476267006371 JMS_HEADER_FIELD:JMSDestination = TestQueue1 JMS_BODY_FIELD:JMSText = TestQueue's second message JMS_HEADER_FIELD:JMSType = JMS_HEADER_FIELD:JMSCorrelationID = JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:rickhunter.domain-44431-1476325780609-4:1:1:1:1 JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 0 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1476326826475 JMS_HEADER_FIELD:JMSDestination = TestQueue1 JMS_BODY_FIELD:JMSText = TestQueue1's third message. JMS_HEADER_FIELD:JMSType = JMS_HEADER_FIELD:JMSCorrelationID = JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:rickhunter.domain-44431-1476325780609-4:1:1:1:2 JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 0 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1476326873071
咱們數據遷移的基本思路,是從虛擬機A的ActiveMQ上讀取TestQueue1的持久化消息,發送給虛擬機B的ActiveMQ的TestQueue1.
讀取TestQueue1隊列消息的代碼片斷以下(這段代碼參考了王新春的代碼, 根據實際作了一點修改,特此感謝。http://wangxinchun.iteye.com/blog/2146120?utm_source=tuicool&utm_medium=referral)
String url = "tcp://xxx.xxx.xxx.xxx:61616"; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", url); ActiveMQConnection connection = null; try { connection = (ActiveMQConnection)connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue("TestQueue1"); QueueBrowser browser = session.createBrowser(destination); Enumeration<?> enumeration = browser.getEnumeration(); while (enumeration.hasMoreElements()) { TextMessage message = (TextMessage) enumeration.nextElement(); System.out.println("Browsing: " + message.getText()); } session.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
上述代碼只是從源ActiveMQ的TestQueue1讀取全部持久化消息並將每條消息正文輸出到控制檯,接下來咱們對這段代碼進行改造,將獲取的消息發送到目標ActiveMQ的TestQueue1。
咱們將每一條消息放到一個消息List中。而後鏈接目標ActiveMQ,將消息List中的每一條消息發送到TestQueue1。代碼片斷以下:
List<TextMessage> messageList = new ArrayList<TextMessage>(); .......... Enumeration<?> enumeration = browser.getEnumeration(); while (enumeration.hasMoreElements()) { TextMessage message = (TextMessage) enumeration.nextElement(); messageList.add(message); } .......... sendMessages(messageList); .......... private static void sendMessages(List<TextMessage> messageList) { try { String sendActiveMQUrl = "tcp://xxx.xxx.xxx.xxx:61616"; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(sendActiveMQUrl); connectionFactory.setUserName("admin"); connectionFactory.setPassword("admin"); ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection(); connection.setDisableTimeStampsByDefault(true); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目標,就建立主題也能夠建立隊列 Destination destination = session.createQueue("TestQueue1"); //建立消息生產者 MessageProducer producer = session.createProducer(destination); //設置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT producer.setDeliveryMode(DeliveryMode.PERSISTENT); for(TextMessage message:messageList) { // 發送消息到ActiveMQ producer.send(message); TimeUnit.MILLISECONDS.sleep(10); } // 關閉資源 session.close(); connection.close(); System.out.println("The messages import is completed."); } catch(Exception ex) { ex.printStackTrace(); } }
上述代碼中有一句代碼請特別注意
connection.setDisableTimeStampsByDefault(true);
這句代碼是控制是否使用JmsMessage自己自帶的TimeStamp屬性。ActiveMQConnection類的disableTimeStampsByDefault屬性的API描述是這樣的(基於ActiveMQ 5.14.0的代碼)
/** * Sets whether or not timestamps on messages should be disabled or not. If * you disable them it adds a small performance boost. */ public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) { this.disableTimeStampsByDefault = timeStampsDisableByDefault; }
從API描述看出來,它是設置JMS Message自身所帶的timestamp是否被禁用。具體使用這個屬性的代碼是在ActiveMQConnection的send方法
void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException { ............... if (!isDisableTimeStampsByDefault()) { long timeStamp = System.currentTimeMillis(); msg.setJMSTimestamp(timeStamp); if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } ............... }
從上述代碼能夠看出,若是disableTimeStampsByDefault屬性是false,ActiveMQ在發送Jms Message對象時,將無視消息對象自帶的timestamp,使用JmsMessage發送時的時間。這樣對於Jms Message對象timestamp屬性敏感的應用就會出現問題,而設置這個屬性爲true,將保留Jms Message對象原先的timestamp信息。
補充一點,若是不設置ActiveMQConnection的disableTimeStampsByDefault屬性,設置MessageProducer的disableMessageTimestamp屬性能夠起到相同的效果。MessageProducer的disableMessageTimestamp屬性的API描述是
public abstract class ActiveMQMessageProducerSupport implements MessageProducer, Closeable { ............... /** * Sets whether message timestamps are disabled. * <P> * Since timestamps take some effort to create and increase a message's * size, some JMS providers may be able to optimize message overhead if * they are given a hint that the timestamp is not used by an application. * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this * message producer, a JMS client enables this potential optimization for * all messages sent by this message producer. If the JMS provider accepts * this hint, these messages must have the timestamp set to zero; if the * provider ignores the hint, the timestamp must be set to its normal * value. * <P> * Message timestamps are enabled by default. * * @param value indicates if message timestamps are disabled * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to * some internal error. */ public void setDisableMessageTimestamp(boolean value) throws JMSException { ............ }
當MessageProducer在發送消息時使用到了這個屬性
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { .............. protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message,int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { .............. synchronized (sendMutex) { .............. if (!producer.getDisableMessageTimestamp()) { long timeStamp = System.currentTimeMillis(); message.setJMSTimestamp(timeStamp); if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } ....... }
對照ActiveMQConnection的send方法,能夠看出兩段代碼的做用徹底同樣,所以咱們能夠設置MessageProducer的disableMessageTimestamp屬性達到相同效果,具體修改代碼以下:
ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(destination); producer.setDisableMessageTimestamp(true);
代碼修改完成後運行,咱們能夠看到目標ActiveMQ的TestQueue1中已經有遷移的三條消息。
JMS_HEADER_FIELD:JMSDestination = TestQueue1 JMS_BODY_FIELD:JMSText = Test Queue1's first message. JMS_HEADER_FIELD:JMSType = JMS_HEADER_FIELD:JMSCorrelationID = JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:rick.domain-4065-1476339886463-3:1:1:1:1 JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 4 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1476267006371 JMS_HEADER_FIELD:JMSDestination = TestQueue1 JMS_BODY_FIELD:JMSText = TestQueue's second message JMS_HEADER_FIELD:JMSType = JMS_HEADER_FIELD:JMSCorrelationID = JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:rick.domain-4065-1476339886463-3:1:1:1:2 JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 4 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1476326826475 JMS_HEADER_FIELD:JMSDestination = TestQueue1 JMS_BODY_FIELD:JMSText = TestQueue1's third message. JMS_HEADER_FIELD:JMSType = JMS_HEADER_FIELD:JMSCorrelationID = JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:rick.domain-4065-1476339886463-3:1:1:1:3 JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 4 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1476326873071
與源ActiveMQ的三條消息比較,除了JMSMessageID屬性不同之外(MessageId是由發送Message的Client機器ID和隨機數生成的,不可能保證和原消息徹底一致),其餘屬性徹底一致。若是要保留源消息的MessageId信息,能夠將其設置到JMSCorrelationID屬性。
2.單ActiveMQ,多隊列消息遷移
在1中咱們實現單隊列消息的遷移。但若是源ActiveMQ有多個隊列,每一個隊列都有持久化消息,如何實現這些隊列消息的遷移?
咱們最初的思路是將全部包含持久化消息的隊列名稱寫入一個配置文件裏,在遷移的時候讀取這些隊列名稱。可是這樣作的問題就是不靈活,有可能在遷移時一些原先沒有持久化消息的隊列包含了消息,一些原先有持久化消息的隊列的消息被消費掉了,使得不是全部隊列的消息都能被遷移。
經過查看ActiveMQ的API文檔,咱們決定使用ActiveMQConnection類的destinationSource屬性,這個屬性的API描述是
/** * Returns the {@link DestinationSource} object which can be used to listen to destinations * being created or destroyed or to enquire about the current destinations available on the broker * * @return a lazily created destination source * @throws JMSException */ @Override public DestinationSource getDestinationSource() throws JMSException { ....... }
從API描述上能夠看出這個屬性是一個源對象,是對當前ActiveMQ鏈接上的全部Destination(包括Queue和Topic)的一個偵聽對象。從DestinationSource對象,咱們能夠獲取源ActiveMQ的全部Queue(也能夠得到全部Topic,這裏再也不贅述).
public class DestinationSource implements MessageListener { .............. /** * Returns the current queues available on the broker */ public Set<ActiveMQQueue> getQueues() { return queues; } .............. }
得到了源ActiveMQ的Queue集合,接下來的操做就和1同樣了,只是過濾掉那些不不包含持久化消息的Queue。實現代碼的片斷以下:
HashMap<String, List<TextMessage>> queueMessageMap = new HashMap<String, List<TextMessage>>(); ................. Set<ActiveMQQueue> activeMQQueues = connection.getDestinationSource().getQueues(); for(ActiveMQQueue destination: activeMQQueues) { QueueBrowser browser = session.createBrowser(destination); Enumeration<?> enumeration = browser.getEnumeration(); List<TextMessage> messageList = new ArrayList<TextMessage>(); while (enumeration.hasMoreElements()) { TextMessage message = (TextMessage) enumeration.nextElement(); messageList.add(message); } if (messageList.size() > 0) { queueMessageMap.put(destination.getQueueName(), messageList); } ................. if(queueMessageMap.size() > 0) { sendMessages(queueMessageMap); } ................. private static void sendMessages(HashMap<String, List<TextMessage>> queueMessageMap) { ................. for(String queueName:queueMessageMap.keySet()) { // 建立目標,就建立主題也能夠建立隊列 Destination destination = session.createQueue(queueName); // 建立消息生產者 MessageProducer producer = session.createProducer(destination); // 設置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT producer.setDeliveryMode(DeliveryMode.PERSISTENT); List<TextMessage> messageList = queueMessageMap.get(queueName); for(TextMessage message:messageList) { // 發送消息到ActiveMQ producer.send(message); TimeUnit.MILLISECONDS.sleep(10); } }
咱們在源ActiveMQ的TestQueue1,TestQueue2,TestQueue3隊列裏分別建立三條持久化消息(以下圖所示)
刪除目標ActiveMQ TestQueue1隊列裏1中遷移的三條消息
運行代碼,咱們能夠看到源ActiveMQ中的三個隊列裏的九條消息被遷移到了目標ActiveMQ
3.多ActiveMQ,多隊列消息遷移
在這個場景中目標ActiveMQ是使用zk + Replicated LevelDB結構的三臺ActiveMQ。代碼仍然使用和2相同的代碼,只是鏈接目標ActiveMQ的brokerUrl形式改成failover:(tcp://xxx.xxx.xxx, tcp://xxx.xxx.xxx, tcp://xxx.xxx.xxx)
咱們使用的三臺ActiveMQ的清單以下:
ActiveMQ服務器號 | tcp端口號 | zk端口號 | 管理界面端口號 |
ActiveMQ-1 | 61616 | 2181 | 8161 |
ActiveMQ-2 | 62616 | 2181 | 8261 |
ActiveMQ-3 | 63616 | 2181 | 8361 |
最初三臺ActiveMQ中沒有隊列,也沒有消息(此時ActiveMQ-1爲Master,ActiveMQ-2,ActiveMQ-3爲Slave)
運行2的程序後,咱們能夠看到ActiveMQ-1中有了TestQueue1,TestQueue2,TestQueue3三個隊列,而且有了9條消息。
當咱們停掉ActiveMQ-1後,ActiveMQ-2成爲Master,咱們能夠從它的管理界面看到ActiveMQ-2也有三個隊列,9條消息。
這說明2的程序對於Zookeeper + Replicated LevelDB架構的多ActiveMQ 數據遷移仍然有效。
4.結語
本文實現了從數據庫介質的ActiveMQ到使用Replicated LevelDB文件介質的ActiveMQ的消息遷移,但並不只限於此種場景,任何兩個ActiveMQ之間均可以實現相似的消息遷移。在實際運行中,還可使用這樣的程序實現不一樣ActiveMQ之間的數據同步,以及在一個ActiveMQ內,消息在不一樣隊列間的遷移。這須要開發者根據實際須要進行修改和調整。