import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 1.Destination 目的地 * Queue 隊列 * 隊列中的消息,默認只能由惟一的一個消費者處理,一旦處理消息後消息就被刪除 * * 支持存在多個消費者,多個生產者,因此消費者不可能消費到已經被消費的消息 * 當消費者不存在時,消息會一直保存,直到有消費者消費 * * Topic 目的地 * 消息會發送給全部的消費者同時處理,只有在消息能夠重複處理的業務場景中可以使用 * * */ public class TestProducer { public void sendTextMessage(String mess){ ConnectionFactory connectionFactory = null; Connection connection = null; Destination destination = null; Session session = null; MessageProducer messageProducer =null; Message message = null; try { connectionFactory = new ActiveMQConnectionFactory(); connection = connectionFactory.createConnection(); //發送者默認是啓動的,消費者默認是不啓動的,全部客戶端時必須啓動 //若是有特殊配置,配置後再啓動 connection.start(); //1 第一個參數 是否支持事務,不推薦事務,批量時建議使用 若是true 第二個參數無效(只是提供者) // 客戶端不支持事務 //2 消息確認機制 /** * 1 auto_acknowledge 自動確認消息,消息的消費者處理消息後,自動確認,經常使用,商業開發不推薦 * 2 client_acknowledge 客戶端手動確認,消息的消費者處理後,必須手工確認 * 3 dups_ok_acknowledge 有副本的客戶端手動確認 一個消息能夠屢次處理,能夠下降session的消耗,在 * * 能夠容忍重複消息時使用(不推薦使用)
//設置該消息的超時時間
//messageProducer.setTimeToLive(1000);
/**
* 過時的、處理失敗的消息,將會被ActiveMQ置入「ActiveMQ.DLQ」這個隊列中。這個隊列是由ActiveMQ自動建立的。
*
* 若是須要查看這些未被處理的消息,能夠進入這個隊列中查看
Destination destination = session.createQueue("ActiveMQ.DLQ");
*/ * */ session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("first-mq"); messageProducer = session.createProducer(destination); message = session.createTextMessage(mess); messageProducer.send(message);
//session.commit(); //啓用事務時記得提交事務,否則消費端接收不到消息 System.out.println("消息已發送"); }catch (Exception e){ e.printStackTrace(); }finally { if (messageProducer!=null){ try { messageProducer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session!=null){ try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void main(String[] args) { TestProducer t = new TestProducer(); t.sendTextMessage("發送一條消息"); } }
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by Administrator on 2019/6/27. */ public class TestConsumer { public void rTextMessage() { ConnectionFactory connectionFactory = null; Connection connection = null; Destination destination = null; Session session = null; MessageConsumer messageConsumer = null; Message message = null; try { connectionFactory = new ActiveMQConnectionFactory(); connection = connectionFactory.createConnection(); //發送者默認是啓動的,消費者默認是不啓動的,全部客戶端時必須啓動 //若是有特殊配置,配置後再啓動 connection.start(); //1 第一個參數 是否支持事務,不推薦事務,批量時建議使用 若是true 第二個參數無效(只是提供者) // 客戶端不支持事務 //2 消息確認機制 /** * 1 auto_acknowledge 自動確認消息,消息的消費者處理消息後,自動確認,經常使用,商業開發不推薦 * 2 client_acknowledge 客戶端手動確認,消息的消費者處理後,必須手工確認 * 3 dups_ok_acknowledge 有副本的客戶端手動確認 一個消息能夠屢次處理,能夠下降session的消耗,在 * * 能夠容忍重複消息時使用(不推薦使用) * */ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("first-mq"); //messageProducer = session.createProducer(destination); messageConsumer = session.createConsumer(destination); /** * 主動活動消息,執行一次,拉去一個消息,開發少用 * 多個消費者要用監聽 */ message =messageConsumer.receive(); String mess = ((TextMessage)message).getText(); System.out.println("接收到的消息----------"+mess); } catch (Exception e) { e.printStackTrace(); } finally { if (messageConsumer != null) { try { messageConsumer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void main(String[] args) { TestConsumer t = new TestConsumer(); t.rTextMessage(); } }
/** * 監聽隊列消息 */ public void receiveMessageListener() { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; try { connectionFactory = new ActiveMQConnectionFactory(brokerURL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("first-quere"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收消息:"+textMessage.getText()); //textMessage.acknowledge();消息確認機制 } catch (JMSException e) { e.printStackTrace(); } } }); } catch (Exception e) { e.printStackTrace(); }finally { } }
1java