1、下載java
ActiveMQ 5.15.0下載地址spring
2、安裝apache
D:\apache-activemq-5.15.7-bin\apache-activemq-5.15.7\bin\win64緩存
雙擊activemq.bat執行 開始自動安裝啓動服務器
訪問地址mq客戶端頁面 http://localhost:8161/admin/topics.jsp 登陸帳戶密碼默認admin/adminsession
3、使用jsp
jar包 activemq-all-5.4.3.jar activemq-pool-5.4.3.jartcp
點對點的消息發送方式主要創建在 Message Queue,Sender,reciever上,Message Queue 存貯消息,Sneder 發送消息,receive接收消息.具體點就是Sender Client發送Message Queue ,而 receiver Cliernt從Queue中接收消息和"發送消息已接受"到Quere,確認消息接收。消息發送客戶端與接收客戶端沒有時間上的依賴,發送客戶端能夠在任什麼時候刻發送信息到Queue,而不須要知道接收客戶端是否是在運行ide
發佈/訂閱方式用於多接收客戶端的方式.做爲發佈訂閱的方式,可能存在多個接收客戶端,而且接收端客戶端與發送客戶端存在時間上的依賴。一個接收端只能接收他建立之後發送客戶端發送的信息。做爲subscriber ,在接收消息時有兩種方法,destination的receive方法,和實現message listener 接口的onMessage 方法。學習
發送消息的基本步驟:
(1)、建立鏈接使用的工廠類JMS ConnectionFactory
(2)、使用管理對象JMS ConnectionFactory創建鏈接Connection,並啓動
(3)、使用鏈接Connection 創建會話Session
(4)、使用會話Session和管理對象Destination建立消息生產者MessageSender
(5)、使用消息生產者MessageSender發送消息
消息接收者從JMS接受消息的步驟
(1)、建立鏈接使用的工廠類JMS ConnectionFactory
(2)、使用管理對象JMS ConnectionFactory創建鏈接Connection,並啓動
(3)、使用鏈接Connection 創建會話Session
(4)、使用會話Session和管理對象Destination建立消息接收者MessageReceiver
(5)、使用消息接收者MessageReceiver接受消息,須要用setMessageListener將MessageListener接口綁定到MessageReceiver消息接收者必須實現了MessageListener接口,須要定義onMessage事件方法。
4、java代碼
Spring集成
新建activemq_config.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
<!-- <value>tcp://192.168.0.140:61616</value> -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://192.168.0.127:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!--測試Queue,隊列的名字是spring-queue-->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="spring-queue"/>
</bean>
<!--測試Topic-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="cms.vss.pic.topic2"/>
</bean>
<!--使用緩存能夠提高效率-->
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory"/>
<property name="sessionCacheSize" value="1"/>
</bean>
<!-- 生產者 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<!-- 消費者 -->
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="destinationTopic"/>
<property name="messageListener" ref="topicListener"/>
</bean>
<!-- 消息監聽器 -->
<bean id="topicListener" class="com.zlkj.activemq.listener.TopicListener">
</bean>
</beans>
發送消息
package com.zlkj.wl.service;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import com.zlkj.wl.bean.ActiveBean;
@Service("senderService")
public class AMQSenderServiceImpl implements AmqSenderService {
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
//目的地隊列的明證,咱們要向這個隊列發送消息
@Resource(name = "destinationTopic")
private Destination destination;
//向特定的隊列發送消息
@Override
public void sendMsg(final ActiveBean mqParamDto) {
//final String msg = JsonTools.ojbToJson(mqParamDto, false);
// final String msg =mqParamDto;
try {
jmsTemplate.send(destination, new MessageCreator() {
@Override
public ObjectMessage createMessage(Session session) throws JMSException {
return session.createObjectMessage(mqParamDto);
//return session.createTextMessage(msg);
}
});
} catch (Exception ex) {
}
}
}
接收消息則經過xml中配置監聽能夠接收到消息
建立監聽類
package com.zlkj.activemq.listener;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class TopicListener implements MessageListener {
@Override
public void onMessage(Message msg) {
try {
Destination destination = msg.getJMSDestination();
String topic = destination.toString();
if(topic.equals("topic://INOUT")) {
System.out.println("1");
TextMessage txtMsg = (TextMessage) msg;
String message = txtMsg.getText();
//實際項目中拿到String類型的message(一般是JSON字符串)以後,
//會進行反序列化成對象,作進一步的處理
System.out.println("receive txt msg===" + message);
} else if (topic.equals("topic://ENROLL")) {
System.out.println("2");
TextMessage txtMsg = (TextMessage) msg;
String message = txtMsg.getText();
//實際項目中拿到String類型的message(一般是JSON字符串)以後,
//會進行反序列化成對象,作進一步的處理
System.out.println("receive txt msg===" + message);
} else if (topic.equals("topic://ACTION")) {
System.out.println("3");
TextMessage txtMsg = (TextMessage) msg;
String message = txtMsg.getText();
//實際項目中拿到String類型的message(一般是JSON字符串)以後,
//會進行反序列化成對象,作進一步的處理
System.out.println("receive txt msg===" + message);
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
接收消息監聽不打出時 注意constructor-arg的值(標紅)和手動發送消息的session.createTopic("ENROLL")值應相同
不集成spring
發送消息
package com.zlkj.test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.zlkj.pub.toolutil.JsonTools;
import com.zlkj.wl.bean.ActiveBean;
public class NoPersistenceActiveSender {
private static final int SENDNUM = 10;
public static void sendmes( ActiveBean record) {
//鏈接工廠
ConnectionFactory connectionFactory;
//鏈接
Connection connection = null;
//會話 接受或者發送消息的線程
Session session;
//消息的目的地
Destination destination;
//消息生產者
MessageProducer messageProducer;
//實例化鏈接工廠(鏈接到ActiveMQ服務器)
connectionFactory = new ActiveMQConnectionFactory(null, null, "tcp://192.168.0.127:61616");
try {
//經過鏈接工廠獲取鏈接
connection = connectionFactory.createConnection();
//啓動鏈接
connection.start();
//建立session
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//建立一個名稱爲MyTopic的消息隊列(生產者生成的消息放在哪)
destination = session.createTopic("ENROLL");
//建立消息生產者
messageProducer = session.createProducer(destination);
//發送消息
sendMessage(session, messageProducer,record);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 發送消息
*
* @param session
* @param messageProducer 消息生產者
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer messageProducer,ActiveBean record) throws Exception {
ObjectMessage message = session.createObjectMessage(record);
messageProducer.send(message);
// }
}
}
接收消息
package com.zlkj.test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class NoPersistenceReceiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;//鏈接工廠
Connection connection = null;//鏈接
Session session;//會話 接受或者發送消息的線程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息的消費者
//實例化鏈接工廠(鏈接到ActiveMQ服務器)
connectionFactory = new ActiveMQConnectionFactory(null, null, "tcp://192.168.0.127:61616");
try {
//經過鏈接工廠獲取鏈接
connection = connectionFactory.createConnection();
//啓動鏈接
connection.start();
//建立session
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//生產者將消息發送到MyTopic,因此消費者要到MyTopic去取
destination = session.createTopic("ENROLL1a");
//destination = session.createTopic("MyTopic");
//建立消息消費者
messageConsumer = session.createConsumer(destination);
Message message = messageConsumer.receive();
while (message != null) {
TextMessage txtMsg = (TextMessage) message;
System.out.println("收到消息:" + txtMsg.getText());
message = messageConsumer.receive();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接收消息的時候必定執行main方法 用while死循環一直輪訓手動接收發送的消息
學習activeMq源路徑網址 https://www.jianshu.com/p/8caa6d66b10d