1、ActivityMq的介紹:html
1.什麼是消息中間件?與傳統的傳輸通信有什麼區別?java
異步,無需等待,消息存放在隊列裏面。spring
2.爲何要使用消息中間件?apache
消息中間件能夠解決高併發。瀏覽器
兩種通信方式:01.點對點通信。(Queue)緩存
02.發佈訂閱。(Topic)服務器
生產者:發送消息,提供接口的,主要向隊列中發送消息session
隊列:存放消息地址併發
消息:發送的報文信息異步
消費者:調用接口的,主要從隊列中獲取消息
3.步驟:
0一、生產者向隊列進行發送消息,若是消費者不在,隊列就會將消息緩存
0二、消費者向隊列中獲取到消息以後,消費成功以後, 該消息隊列直接被清除掉。(不清除就會產生重複消費問題)
4.生產者向隊列中生產高併發流量,消費者會不會掛掉?
不會,由於隊列會緩存消息。
5.爲何MQ可以解決高併發問題?
不會立馬處理那麼多的消息,隊列會進行緩存,進行排隊。
6.JMS:java發送消息,客戶端與服務器進行通信的方式,能夠理解成java的消息 中間件
消息模型:
0一、點對點通信方式:
流程:生產者 隊列 消費者。
特色:一對一 異步通信,生產者生產的消息 只容許有一個消費者進行消費。
0二、發佈訂閱:
流程:生產者 主題 消費者
特色:發佈訂閱 一個生產者 能夠多個消費者 一對多。
2、ActivityMq安裝:
1.下載ActivityMQ
官方網站:http://activemq.apache.org/download.html
2.運行ActivityMQ
解壓apache-activemq-5.15.9-bin.zip,進入該文件夾的bin目錄。
有兩種方式啓動ActivityMQ服務
0一、在bin目錄下用cmd命令activemq start 啓動服務,關掉黑窗口服務即中止
0二、進入bin目錄下對應電腦位數的文件夾,64位進入win64,雙擊InstallService.bat批處理文件安裝ActiveMQ服務,而後打開任務管理器啓動ActiveMQ服務
啓動服務後打開瀏覽器輸入:http://localhost:8161/admin/ 輸入默認設置的帳戶:admin密碼admin
點擊隊列(Queues),輸入隊列名稱(Queue Name)FirstQueue,而後點建立(Craete)
3.建立maven項目
添加一個activemq-all-5.15.3.jar便可,在pom.xml加入
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency> </dependencies>
3、建立producer.java和consumer.java
producer.java
package main; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { // 默認鏈接用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默認鏈接密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默認鏈接地址 private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { // 鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); try { // 鏈接 Connection connection = connectionFactory.createConnection(); // 啓動鏈接 connection.start(); // 建立session Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 消息目的地 Destination destination = session.createQueue("FirstQueue"); // 消息生產者 MessageProducer producer = session.createProducer(null); // 設置不持久化,此處學習,實際根據項目決定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 發送消息 for (int i = 0; i < 10; i++) { // 建立一條文本消息 TextMessage message = session.createTextMessage("ActiveMQ: 這是第 " + i + " 條消息"); // Destination destination = session.createTopic("FirstQueueTopic"); // 生產者發送消息 producer.send(destination, message); } session.commit(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
consumer.java
package main; import javax.jms.*; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { // 默認鏈接用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默認鏈接密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默認鏈接地址 private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { // 鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); try { // 鏈接 Connection connection = connectionFactory.createConnection(); // 啓動鏈接 connection.start(); // 建立session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消息目的地 Destination destination = session.createQueue("FirstQueue"); // Destination destination = session.createTopic("FirstQueueTopic"); // 消息消費者 MessageConsumer consumer = session.createConsumer(destination); while (true) { TextMessage message = (TextMessage) consumer.receive(); if (message != null) { System.out.println("接收到消息: " + message.getText()); } else { break; } } session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
4、測試:
先運行producer.java,再運行consumer.java。
5、項目中使用:
spring-activemq.xml配置文件:
生產者
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq.brokerURL}" userName="${activemq.username}" password="${activemq.password}" /> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory" /> <property name="sessionCacheSize" value="100" /> </bean> <!-- 定義JmsTemplate的Queue類型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(發佈/訂閱),即隊列模式 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 定義JmsTemplate的Topic類型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory" /> <!-- pub/sub模型(發佈/訂閱) --> <property name="pubSubDomain" value="true" /> </bean> </beans>
config.properties配置用戶名,密碼,url等信息。
activemq.brokerURL=failover:(tcp://10.135.100.59:9203,tcp://10.135.100.60:9203) #activemq.brokerURL=tcp://10.135.100.59:9203 activemq.username=admin activemq.password=admin
代碼中的使用:
@Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate;// 經過@Qualifier修飾符來注入對應的bean /** * @Title: send 發送一條消息到指定的隊列(目標) * @Description: * @param queueName 隊列名稱 * @param message 消息內容 * @date 2016年10月12日 上午10:15:56 */ public void send(String queueName, final String message) { jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); }