發送主題(topic)類java
package com.jason.testmq; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class SendTopic { private static final String url = "tcp://localhost:61616"; private static final String TOPIC_NAME = "choice.topic"; //private String expectedBody = "<hello>world!two</hello>"; //private String expectedBody = "stop"; public void sendMessage() throws JMSException { Connection connection = null; try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url); connection = (Connection) connectionFactory.createConnection(); connection.start(); Session session = (Session) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic(TOPIC_NAME); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // TextMessage message = session.createTextMessage(expectedBody); // message.setStringProperty("headname", "remoteB"); JmsTestMessage testMessage = new JmsTestMessage(); testMessage.setId("1234567"); testMessage.setMsg("stop"); testMessage.setStatus(1); ObjectMessage message = session.createObjectMessage(testMessage); producer.send(message); producer.close(); session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { SendTopic sndMsg = new SendTopic(); try { sndMsg.sendMessage(); } catch (Exception ex) { System.out.println(ex.toString()); } } }
接收訂閱消息類apache
/** * */ package com.jason.testmq; import java.io.Serializable; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class ReceiveTopic implements MessageListener { private static final String url = "tcp://localhost:61616"; private static final String TOPIC_NAME = "choice.topic"; private boolean stop = false; public void receiveMessage() { (new Thread(new ReceiveTopicRunnable())).start(); } public void onMessage(Message message) { // try { // if (message instanceof TextMessage) { // TextMessage txtMsg = (TextMessage) message; // String msg = txtMsg.getText(); // System.out.println("Received: " + msg); // } // } catch (Exception e) { // e.printStackTrace(); // } try { if (message instanceof ObjectMessage) { ObjectMessage objMsg = (ObjectMessage)message; Serializable obj = objMsg.getObject(); if (obj instanceof JmsTestMessage) { JmsTestMessage testMessage = (JmsTestMessage)obj; System.out.println("Received new msg id is " + testMessage.getId() + ",msg is " + testMessage.getMsg() + ",status is " + testMessage.getStatus()); if ("stop".equals(testMessage.getMsg())) { this.stop = true; } } else { System.out.println("it is not JmsTestMessage"); } } else { System.out.println("other type message with type is " + message.getJMSType()); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String args[]) { ReceiveTopic rm = new ReceiveTopic(); rm.receiveMessage(); } private class ReceiveTopicRunnable implements Runnable { public void run() { Connection connection = null; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url); try { connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(ReceiveTopic.this); connection.start(); while (!ReceiveTopic.this.stop) { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Closing connection"); consumer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
消息類session
/** * */ package com.jason.testmq; import java.io.Serializable; /** * @author jasonzhang * */ public class JmsTestMessage implements Serializable { /** * */ private static final long serialVersionUID = 1L; private String id; private String msg; private int status; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } }
由於發佈訂閱消息的方式是指在發佈消息到topic時,若是當前有訂閱者正在監聽該主題則該訂閱才能收到消息,若是是在主題發佈完後才進行監聽的則接收不到以前發佈的消息,因此這邊接收消息的類必須以線程的方式啓動而且在運行發佈消息的類以前運行訂閱消息的類。tcp
參考連接:this