一、消息頭java
二、消息體spring
三、消息屬性apache
一、TextMessage springboot
二、MapMessage session
三、ObjectMessage tcp
四、BytesMessage ide
五、StreamMessageurl
一、非持久化spa
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);code
非持久化,當mq宕機後消息不存在
二、持久化(消息默認是持久化)
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
持久化,當mq宕機後消息存在
一、非持久化隊列
生產者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/2 17:04 * @Version: 1.0 */ public class ActiveMQTest { //url路徑 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //隊列名稱 private static final String QUEUE_NAME="queue01"; //主題名稱 private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) { //一、建立鏈接工廠 //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帳號密碼修改的話 //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、經過鏈接工廠獲取鏈接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session會話 //裏面會有兩個參數,第一個爲事物,第二個是簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、建立目的地(具體是隊列仍是主題),這裏是建立隊列 Queue queue=session.createQueue(QUEUE_NAME); //五、建立消息生產者,隊列模式 MessageProducer messageProducer = session.createProducer(queue); //六、經過messageProducer生產三條消息發送到MQ消息隊列中 for (int i=0;i<3;i++){ //七、建立消息 TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一個文本消息 //消息屬性 textMessage.setStringProperty("c01","vip"); //八、經過messageProducer發送給mq messageProducer.send(textMessage); //九、數據非持久化 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } messageProducer.close(); session.close(); connection.close(); System.out.println("消息發送成功"); } catch (JMSException e) { e.printStackTrace(); } } }
消費者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/3 8:47 * @Version: 1.0 */ public class ActiveMQConsumer { //url路徑 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //隊列名稱 private static final String QUEUE_NAME="queue01"; public static void main(String[] args) { //一、建立鏈接工廠 //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帳號密碼修改的話 //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、經過鏈接工廠獲取鏈接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session會話 //裏面會有兩個參數,第一個爲事物,第二個是簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、這裏接受的queue的名稱要和發送者的一致 Queue queue = session.createQueue(QUEUE_NAME); //五、建立消費者 MessageConsumer consumer = session.createConsumer(queue); //六、經過監聽的方式消費消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //若是message不等於null而且屬於TextMessage類型(由於消息發送的類型是TextMessage,因此這裏判斷是不是這個類型) if(null!=message&&message instanceof TextMessage){ TextMessage textMessage=(TextMessage)message; try { System.out.println(textMessage.getText()); //獲取消息屬性 System.out.println("消息屬性--->"+textMessage.getStringProperty("c01")); } catch (JMSException e) { e.printStackTrace(); } } } }); //七、保證控制檯一直在運行 System.in.read(); //八、閉資源 consumer.close(); session.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
二、非持久化主題
生產者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/4 9:29 * @Version: 1.0 */ public class ActiveMQTopicTest { //url路徑 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //主題名稱 private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) { //一、建立鏈接工廠 //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帳號密碼修改的話 //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、經過鏈接工廠獲取鏈接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session會話 //裏面會有兩個參數,第一個爲事物,第二個是簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、建立目的地(具體是隊列仍是主題),這裏是建立主題 Topic topic=session.createTopic(TOPIC_NAME); //五、建立消息生產者,主題模式 MessageProducer messageProducer = session.createProducer(topic); //六、經過messageProducer生產三條消息發送到MQ消息主題中 for (int i=0;i<3;i++){ //七、建立消息 TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一個文本消息 //八、經過messageProducer發送給mq messageProducer.send(textMessage); } messageProducer.close(); session.close(); connection.close(); System.out.println("消息發送成功"); } catch (JMSException e) { e.printStackTrace(); } } }
消費者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/4 9:43 * @Version: 1.0 */ public class ActiveMQTopicConsumer { //url路徑 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //主題名稱 private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) { //一、建立鏈接工廠 //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帳號密碼修改的話 //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、經過鏈接工廠獲取鏈接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session會話 //裏面會有兩個參數,第一個爲事物,第二個是簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、這裏接受的queue的名稱要和發送者的一致 Topic topic = session.createTopic(TOPIC_NAME); //五、建立消費者 MessageConsumer consumer = session.createConsumer(topic); //六、經過監聽的方式消費消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //若是message不等於null而且屬於TextMessage類型(由於消息發送的類型是TextMessage,因此這裏判斷是不是這個類型) if(null!=message&&message instanceof TextMessage){ TextMessage textMessage=(TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); //七、保證控制檯一直在運行 System.in.read(); //八、閉資源 consumer.close(); session.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
三、持久化隊列
生產者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/2 17:04 * @Version: 1.0 */ public class ActiveMQTest { //url路徑 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //隊列名稱 private static final String QUEUE_NAME="queue01"; //主題名稱 private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) { //一、建立鏈接工廠 //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帳號密碼修改的話 //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、經過鏈接工廠獲取鏈接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session會話 //裏面會有兩個參數,第一個爲事物,第二個是簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、建立目的地(具體是隊列仍是主題),這裏是建立隊列 Queue queue=session.createQueue(QUEUE_NAME); //五、建立消息生產者,隊列模式 MessageProducer messageProducer = session.createProducer(queue); //六、經過messageProducer生產三條消息發送到MQ消息隊列中 for (int i=0;i<3;i++){ //七、建立消息 TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一個文本消息 //消息屬性 textMessage.setStringProperty("c01","vip"); //八、經過messageProducer發送給mq messageProducer.send(textMessage); //九、數據持久化 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); } messageProducer.close(); session.close(); connection.close(); System.out.println("消息發送成功"); } catch (JMSException e) { e.printStackTrace(); } } }
消費者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/3 8:47 * @Version: 1.0 */ public class ActiveMQConsumer { //url路徑 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //隊列名稱 private static final String QUEUE_NAME="queue01"; public static void main(String[] args) { //一、建立鏈接工廠 //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帳號密碼修改的話 //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、經過鏈接工廠獲取鏈接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session會話 //裏面會有兩個參數,第一個爲事物,第二個是簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、這裏接受的queue的名稱要和發送者的一致 Queue queue = session.createQueue(QUEUE_NAME); //五、建立消費者 MessageConsumer consumer = session.createConsumer(queue); //六、經過監聽的方式消費消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //若是message不等於null而且屬於TextMessage類型(由於消息發送的類型是TextMessage,因此這裏判斷是不是這個類型) if(null!=message&&message instanceof TextMessage){ TextMessage textMessage=(TextMessage)message; try { System.out.println(textMessage.getText()); //獲取消息屬性 System.out.println("消息屬性--->"+textMessage.getStringProperty("c01")); } catch (JMSException e) { e.printStackTrace(); } } } }); //七、保證控制檯一直在運行 System.in.read(); //八、閉資源 consumer.close(); session.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
四、持久化主題
生產者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/4 9:29 * @Version: 1.0 */ public class ActiveMQTopicTest { //url路徑 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //主題名稱 private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) { //一、建立鏈接工廠 //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帳號密碼修改的話 //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、經過鏈接工廠獲取鏈接 Connection connection = activeMQConnectionFactory.createConnection(); //三、建立session會話 //裏面會有兩個參數,第一個爲事物,第二個是簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、建立目的地(具體是隊列仍是主題),這裏是建立主題 Topic topic=session.createTopic(TOPIC_NAME); //五、建立消息生產者,主題模式 MessageProducer messageProducer = session.createProducer(topic); //持久化數據 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); //六、經過messageProducer生產三條消息發送到MQ消息主題中 for (int i=0;i<3;i++){ //七、建立消息 TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一個文本消息 //八、經過messageProducer發送給mq messageProducer.send(textMessage); } messageProducer.close(); session.close(); connection.close(); System.out.println("消息發送成功"); } catch (JMSException e) { e.printStackTrace(); } } }
消費者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.bdqn.test * @Author: huat * @Date: 2020/1/4 9:43 * @Version: 1.0 */ public class ActiveMQTopicConsumer { //url路徑 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //主題名稱 private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) { //一、建立鏈接工廠 //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帳號密碼修改的話 //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、經過鏈接工廠獲取鏈接 Connection connection = activeMQConnectionFactory.createConnection(); //訂閱名稱 connection.setClientID("test01"); //三、建立session會話 //裏面會有兩個參數,第一個爲事物,第二個是簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、這裏接受的queue的名稱要和發送者的一致 Topic topic = session.createTopic(TOPIC_NAME); //五、持久化的訂閱者,第一個參數爲訂閱主題名稱,第二個爲備註 TopicSubscriber topicSubscriber=session.createDurableSubscriber(topic,"remakr"); //六、啓動鏈接 connection.start(); //receive等待消息,不限制時間 Message message = topicSubscriber.receive(); while (null!=message){ TextMessage textMessage=(TextMessage)message; System.out.println("******------>"+textMessage.getText()); //等待五秒鐘 message=topicSubscriber.receive(5000L); } session.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
主題持久化注意事項
一、若是消費者註冊關閉後,消息提供者在消費者關閉後在發送消息,消費者再打開,會收到消息提供者發送的消息。
二、若是在消費者沒有註冊以前,消息提供者發送消息,消費者不會收到以前的消息
總結:
一、必定要先運行一次消費者,等同於向MQ註冊,相似我訂閱了這個主題
二、而後在運行消費者發送消息
三、不管消費者是否在線,都會收到消息,不在線的話,下次鏈接時會把沒有收到過的消息所有接受。