首先建立一個maven工程,在pom文件中增長相關的依賴包,以下:java
<dependency> <groupId>javax.jms</groupId> <artifactId>jms-api</artifactId> <version>1.1-rev-1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>
建立測試類:apache
發送消息類:api
SendMessagesession
package com.jason.testmq; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class SendMessage { private static final String url = "tcp://localhost:61616"; private static final String QUEUE_NAME = "choice.queue"; //private String expectedBody = "<hello>world!two</hello>"; //private String expectedBody = "stop"; public void sendMessage() throws JMSException { Connection connection = null; try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url); connection = (Connection) connectionFactory.createConnection(); connection.start(); Session session = (Session) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(QUEUE_NAME); MessageProducer producer = session.createProducer(destination); // TextMessage message = session.createTextMessage(expectedBody); // message.setStringProperty("headname", "remoteB"); JmsTestMessage testMessage = new JmsTestMessage(); testMessage.setId("1234567"); testMessage.setMsg("stop"); testMessage.setStatus(1); ObjectMessage message = session.createObjectMessage(testMessage); producer.send(message); producer.close(); session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { SendMessage sndMsg = new SendMessage(); try { sndMsg.sendMessage(); } catch (Exception ex) { System.out.println(ex.toString()); } } }
接收消息類:maven
ReceiveMessagetcp
/** * */ package com.jason.testmq; import java.io.Serializable; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ReceiveMessage { private static final String url = "tcp://localhost:61616"; private static final String QUEUE_NAME = "choice.queue"; public void receiveMessage() { Connection connection = null; try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url); connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(destination); consumeMessagesAndClose(connection, session, consumer); } catch (Exception e) { System.out.println(e.toString()); } } protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException { for (int i = 0; i < 1;) { Message message = consumer.receive(1000); if (message != null) { i++; onMessage(message); } } System.out.println("Closing connection"); consumer.close(); session.close(); connection.close(); } public void onMessage(Message message) { // try { // if (message instanceof TextMessage) { // TextMessage txtMsg = (TextMessage) message; // String msg = txtMsg.getText(); // System.out.println("Received: " + msg); // } // } catch (Exception e) { // e.printStackTrace(); // } try { if (message instanceof ObjectMessage) { ObjectMessage objMsg = (ObjectMessage)message; Serializable obj = objMsg.getObject(); if (obj instanceof JmsTestMessage) { JmsTestMessage testMessage = (JmsTestMessage)obj; System.out.println("Received new msg id is " + testMessage.getId() + ",msg is " + testMessage.getMsg() + ",status is " + testMessage.getStatus()); } else { System.out.println("it is not JmsTestMessage"); } } else { System.out.println("other type message with type is " + message.getJMSType()); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String args[]) { ReceiveMessage rm = new ReceiveMessage(); rm.receiveMessage(); } }
以註冊監聽的方式接收消息學習
ReceiveMessageWithListener測試
/** * */ package com.jason.testmq; import java.io.Serializable; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ReceiveMessageWithListener implements MessageListener { private static final String url = "tcp://localhost:61616"; private static final String QUEUE_NAME = "choice.queue"; private boolean stop = false; public void receiveMessage() { (new Thread(new ReceiveMessageRunnable())).start(); } public void onMessage(Message message) { // try { // if (message instanceof TextMessage) { // TextMessage txtMsg = (TextMessage) message; // String msg = txtMsg.getText(); // System.out.println("Received: " + msg); // if ("stop".equals(msg)) { // this.stop = true; // } // } // } catch (Exception e) { // e.printStackTrace(); // } try { if (message instanceof ObjectMessage) { ObjectMessage objMsg = (ObjectMessage)message; Serializable obj = objMsg.getObject(); if (obj instanceof JmsTestMessage) { JmsTestMessage testMessage = (JmsTestMessage)obj; System.out.println("Received new msg id is " + testMessage.getId() + ",msg is " + testMessage.getMsg() + ",status is " + testMessage.getStatus()); if ("stop".equals(testMessage.getMsg())) { this.stop = true; } } else { System.out.println("it is not JmsTestMessage"); } } else { System.out.println("other type message with type is " + message.getJMSType()); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String args[]) { ReceiveMessageWithListener rm = new ReceiveMessageWithListener(); rm.receiveMessage(); } private class ReceiveMessageRunnable implements Runnable { public void run() { Connection connection = null; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url); try { connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(ReceiveMessageWithListener.this); connection.start(); while (!ReceiveMessageWithListener.this.stop) { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Closing connection"); consumer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
在隊列中傳遞的消息類this
JmsTestMessageurl
/** * */ package com.jason.testmq; import java.io.Serializable; /** * @author jasonzhang * */ public class JmsTestMessage implements Serializable { /** * */ private static final long serialVersionUID = 1L; private String id; private String msg; private int status; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } }
參考連接: