ActiveMQ官網java
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。apache
ConnectionFactory:用於建立鏈接到消息中間件的鏈接工廠。
Connection:表明了應用程序和服務之間的鏈接通路。
Destination:指消息發佈的地點,包括隊列模式和主題模式。
Session:表示一個單線程的上下文,用於發送和接受消息。
MessageConsumer:由會話建立,用於接受發送到目的的消息。
MessageProducer:由會話建立,用於發送消息。
Message:是在消費者和生產者之間傳遞的對象,消息頭,一組消息屬性,和一個消息體。session
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
發送隊列模型消息tcp
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class MessageProducer { //定義ActivMQ的鏈接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定義發送消息的隊列名稱 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //建立鏈接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立鏈接 Connection connection = activeMQConnectionFactory.createConnection(); //打開鏈接 connection.start(); //建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立隊列目標 Destination destination = session.createQueue(QUEUE_NAME); //建立一個生產者 javax.jms.MessageProducer producer = session.createProducer(destination); //建立模擬100個消息 for (int i = 1 ; i <= 100 ; i++){ TextMessage message = session.createTextMessage("我發送message:" + i); //發送消息 producer.send(message); //在本地打印消息 System.out.println("我如今發的消息是:" + message.getText()); } //關閉鏈接 connection.close(); } }
接收隊列模型消息ide
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class MessageConsumer { //定義ActivMQ的鏈接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定義發送消息的隊列名稱 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //建立鏈接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立鏈接 Connection connection = activeMQConnectionFactory.createConnection(); //打開鏈接 connection.start(); //建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立隊列目標 Destination destination = session.createQueue(QUEUE_NAME); //建立消費者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //建立消費的監聽 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("獲取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
注:隊列模型的消息,只會被一個消費者所消費,而不會被共享。線程
發送主題模型消息code
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class MessageTopicProducer { //定義ActivMQ的鏈接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定義發送消息的主題名稱 private static final String TOPIC_NAME = "MyTopicMessage"; public static void main(String[] args) throws JMSException { //建立鏈接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立鏈接 Connection connection = activeMQConnectionFactory.createConnection(); //打開鏈接 connection.start(); //建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立隊列目標 Destination destination = session.createTopic(TOPIC_NAME); //建立一個生產者 javax.jms.MessageProducer producer = session.createProducer(destination); //建立模擬100個消息 for (int i = 1; i <= 100; i++) { TextMessage message = session.createTextMessage("當前message是(主題模型):" + i); //發送消息 producer.send(message); //在本地打印消息 System.out.println("我如今發的消息是:" + message.getText()); } //關閉鏈接 connection.close(); } }
接收主題模型消息component
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class MessageTopicConsumer { //定義ActivMQ的鏈接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定義發送消息的隊列名稱 private static final String TOPIC_NAME = "MyTopicMessage"; public static void main(String[] args) throws JMSException { //建立鏈接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立鏈接 Connection connection = activeMQConnectionFactory.createConnection(); //打開鏈接 connection.start(); //建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立隊列目標 Destination destination = session.createTopic(TOPIC_NAME); //建立消費者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //建立消費的監聽 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("獲取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
注:消費者要先訂閱主題,這樣生產者發送的主題模型消息才能被消費者消費。xml