須要jdk,安裝Linux系統,生產環境都是Linux系統。
安裝步驟:
第一步: 把ActiveMQ 的壓縮包上傳到Linux系統。
第二步:解壓縮。
第三步:啓動。
使用bin目錄下的activemq命令啓動:
[root@localhost bin]# ./activemq start
關閉:
[root@localhost bin]# ./activemq stop
查看狀態:
[root@localhost bin]# ./activemq status
注意:若是ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建議使用5.11.2
進入管理後臺:
http://192.168.25.143:8161/admin
用戶名:admin
密碼:admin
spring
對於消息的傳遞有兩種類型:apache
一種是點對點的,即一個生產者和一個消費者一一對應;另外一種是發佈/訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收。
JMS定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。服務器
廣播通訊模式session
點到點通訊app
依賴的jar包tcp
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> </dependency>
Producer:消息的發送方
第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。
第二步:使用ConnectionFactory對象建立一個Connection對象。
第三步:開啓鏈接,調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。
第六步:使用Session對象建立一個Producer對象。
第七步:建立一個Message對象,建立一個TextMessage對象。
第八步:使用Producer對象發送消息。
第九步:關閉資源。ide
public class ActiveMqTest { /** * 點到點形式發送消息 */ @Test public void testQueueProducer() throws Exception { //一、建立一個鏈接工廠對象,須要指定服務的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.144:61616"); //二、使用工廠對象建立一個Connection對象。 Connection connection = connectionFactory.createConnection(); //三、開啓鏈接,調用Connection對象的start方法。 connection.start(); //四、建立一個Session對象。 //第一個參數:是否開啓事務。若是true開啓事務,第二個參數無心義。通常不開啓事務false。 //第二個參數:應答模式。自動應答或者手動應答。通常自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //五、使用Session對象建立一個Destination對象。兩種形式queue、topic,如今應該使用queue Queue queue = session.createQueue("test-queue"); //六、使用Session對象建立一個Producer對象。 MessageProducer producer = session.createProducer(queue); //七、建立一個Message對象,能夠使用TextMessage。 /*TextMessage textMessage = new ActiveMQTextMessage(); textMessage.setText("hello Activemq");*/ TextMessage textMessage = session.createTextMessage("hello activemq"); //八、發送消息 producer.send(textMessage); //九、關閉資源 producer.close(); session.close(); connection.close(); }
Consumer:消息的接收方
第一步:建立一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中得到一個Connection對象。
第三步:開啓鏈接。調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象。和發送端保持一致queue,而且隊列的名稱一致。
第六步:使用Session對象建立一個Consumer對象。
第七步:接收消息。
第八步:打印消息。
第九步:關閉資源工具
@Test public void testQueueConsumer() throws Exception { //建立一個ConnectionFactory對象鏈接MQ服務器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.144:61616"); //建立一個鏈接對象 Connection connection = connectionFactory.createConnection(); //開啓鏈接 connection.start(); //使用Connection對象建立一個Session對象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個Destination對象。queue對象 Queue queue = session.createQueue("test-queue"); //使用Session對象建立一個消費者對象。 MessageConsumer consumer = session.createConsumer(queue); //接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //打印結果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待接收消息 System.in.read(); //關閉資源 consumer.close(); session.close(); connection.close(); }
Topic方式的廣播通訊測試性能
Producer:消息的發送方
第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。
第二步:使用ConnectionFactory對象建立一個Connection對象。
第三步:開啓鏈接,調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Topic對象。
第六步:使用Session對象建立一個Producer對象。
第七步:建立一個Message對象,建立一個TextMessage對象。
第八步:使用Producer對象發送消息。
第九步:關閉資源。測試
@Test public void testTopicProducer() throws Exception { //一、建立一個鏈接工廠對象,須要指定服務的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.144:61616"); //二、使用工廠對象建立一個Connection對象。 Connection connection = connectionFactory.createConnection(); //三、開啓鏈接,調用Connection對象的start方法。 connection.start(); //四、建立一個Session對象。 //第一個參數:是否開啓事務。若是true開啓事務,第二個參數無心義。通常不開啓事務false。 //第二個參數:應答模式。自動應答或者手動應答。通常自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //五、使用Session對象建立一個Destination對象。兩種形式queue、topic,如今應該使用topic Topic topic = session.createTopic("test-topic"); //六、使用Session對象建立一個Producer對象。 MessageProducer producer = session.createProducer(topic); //七、建立一個Message對象,能夠使用TextMessage。 /*TextMessage textMessage = new ActiveMQTextMessage(); textMessage.setText("hello Activemq");*/ TextMessage textMessage = session.createTextMessage("topic message"); //八、發送消息 producer.send(textMessage); //九、關閉資源 producer.close(); session.close(); connection.close(); }
Consumer:消息的接收方
第一步:建立一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中得到一個Connection對象。
第三步:開啓鏈接。調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象。和發送端保持一致topic,而且話題的名稱一致。
第六步:使用Session對象建立一個Consumer對象。
第七步:接收消息。
第八步:打印消息。
第九步:關閉資源。
@Test public void testTopicConsumer() throws Exception { //建立一個ConnectionFactory對象鏈接MQ服務器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.144:61616"); //建立一個鏈接對象 Connection connection = connectionFactory.createConnection(); //開啓鏈接 connection.start(); //使用Connection對象建立一個Session對象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個Destination對象。topic對象 Topic topic = session.createTopic("test-topic"); //使用Session對象建立一個消費者對象。 MessageConsumer consumer = session.createConsumer(topic); //接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //打印結果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic消費者3啓動。。。。"); //等待接收消息 System.in.read(); //關閉資源 consumer.close(); session.close(); connection.close(); }
第一步:引用相關的jar包
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency>
第二步:配置Activemq整合spring。配置ConnectionFactory
<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> </beans>
第三步:配置生產者
使用JMSTemplate對象發送消息
<!-- 配置生產者 --> <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory" /> </bean>
第四步:在spring容器中配置Destination
<!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean>
第五步:代碼測試
①:初始化一個spring容器
②:從容器中得到JMSTemplate對象。
③:從容器中得到一個Destination對象
④:使用JMSTemplate對象發送消息,須要知道Destination
@Test public void testSpringActiveMq() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //從spring容器中得到JmsTemplate對象 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); //從spring容器中取Destination對象 Destination destination = (Destination) applicationContext.getBean("queueDestination"); //使用JmsTemplate對象發送消息。 jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { //建立一個消息對象並返回 TextMessage textMessage = session.createTextMessage("spring activemq queue message"); return textMessage; } }); }
接收消息
在另外一個工程中建立接收消息
第一步:把Activemq相關的jar包添加到工程中
第二步:建立一個MessageListener的實現類
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //取消息內容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }
第三步:配置spring和Activemq整合
<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.144:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> <!-- 接收消息 --> <!-- 配置監聽器 --> <bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" /> <!-- 消息監聽容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean>
第四步:測試代碼
@Test public void testQueueConsumer() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //等待 System.in.read(); }
原文地址:https://www.jianshu.com/p/38a67c47b097