這樣的一種通訊方式,就是所謂的「異步」通訊方式,對於系統A來講,只要把消息發給MQ,而後系統B就會異步處去進行處理了,系統A不能「同步」的等待系統B處理完。這樣的好處是什麼呢?解耦html
4,什麼是ActiveMQjava
1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP2. 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)3. 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性4. 經過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resourceadaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE1.4 商業服務器上5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA6. 支持經過JDBC和journal提供高速的消息持久化7. 從設計上保證了高性能的集羣,客戶端-服務器,點對點8. 支持Ajax9. 支持與Axis(Apache Extensible Interaction System 即阿帕奇可擴展交互系統。Axis本質上就是一個SOAP引擎,提供建立服務器端、客戶端和網關SOAP操做的基本框架)的整合10. 能夠很容易得調用內嵌JMS provider,進行測試11.支持集羣1,下載
2,安裝
1,配置jdk環境變量【不會的回看Linux】2,上傳mq的壓縮包到Linux3,解壓到usr/local/ActiveMQmkdir /usr/local/ActiveMQ
tar -zxvf apache-activemq-5.15.10-bin.tar.gz -C /usr/local/ActiveMQ/
5,配置用戶名和密碼[默認爲admin/admin]vim conf/users.properties
admin = admin
4,啓動和中止重啓./bin/activemq start
2./bin/activemq stop
3./bin/activemq restart
5,訪問
5,端口說明
ActiveMQ是使用61616端口提供的JMS服務使用8161提供管理控制檯的服務
1,JMS消息發送模式
在點對點或隊列模型下,一個生產者向一個特定的隊列發佈消息,一個消費者從該隊列中讀取消息。這裏,生產者知道消費者的隊列,並直接將消息發送到消費者的隊列。這種模式被歸納爲:只有一個消費者將得到消息。生產者不須要在接收者消費該消息期間處於運行狀態,接收者也一樣不須要在消息發送時處於運行狀態。每個成功處理的消息都由接收者簽收。
發佈者/訂閱者模型支持向一個特定的消息主題發佈消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發佈者和訂閱者彼此不知道對方。這種模式比如是匿名公告板。這種模式被歸納爲:多個消費者能夠得到消息.在發佈者和訂閱者之間存在時間依賴性。發佈者須要創建一個訂閱(subscription),以便客戶可以購訂閱。
訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者創建了持久的訂閱。在那種狀況下,在訂閱者未鏈接時發佈的消息將在訂閱者從新鏈接時從新發布。2,JMS應用程序接口
1,ConnectionFactory 接口(鏈接工廠)
用戶用來建立到JMS提供者的鏈接的被管對象。JMS客戶經過可移植的接口訪問鏈接,這樣當下層的實現改變時,
代碼不須要進行修改。管理員在JNDI名字空間中配置鏈接工廠,這樣,JMS客戶纔可以查找到它們。根據消息類型的不一樣,用戶將使用隊列鏈接工廠,或者主題鏈接工廠。
2,Connection 接口(鏈接)
鏈接表明了應用程序和消息服務器之間的通訊鏈路。在得到了鏈接工廠後,就能夠建立一個與JMS提供者的鏈接。根據不一樣的鏈接類型,鏈接容許用戶建立會話,以發送和接收隊列和主題到目標。
3,Destination 接口(目標)
目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發佈和接收的地點,或者是隊列,或者是主題。JMS管理員建立這些對象,而後用戶經過JNDI發現它們。
和鏈接工廠同樣,管理員能夠建立兩種類型的目標,點對點模型的隊列,以及發佈者/訂閱者模型的主題
4,MessageConsumer 接口(消息消費者)由會話建立的對象,用於接收發送到目標的消息。消費者能夠同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。5,MessageProducer 接口(消息生產者)由會話建立的對象,用於發送消息到目標。用戶能夠建立某個目標的發送者,也能夠建立一個通用的發送者,在發送消息時指定目標。6,Message 接口(消息)是在消費者和生產者之間傳送的對象,也就是說從一個應用程序創送到另外一個應用程序。一個消息有三個主要部分:消息頭(必須):包含用於識別和爲消息尋找路由的操做設置。一組消息屬性(可選):包含額外的屬性,支持其餘提供者和用戶的兼容。能夠建立定製的字段和過濾器(消息選擇器)。一個消息體(可選):容許用戶建立五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。消息接口很是靈活,並提供了許多方式來定製消息的內容。7,Session 接口(會話)表示一個單線程得上下文,用於發送和接收消息。因爲會話是單線程的,因此消息是連續得。就是說消息按照發送的順序一個一個接收的。會話得好處是它支持事務,若是用戶支持了事務支持,會話得上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務以前,用戶可使用回滾操做取消這些消息。一個會話容許用戶建立消息產生者來發送消息,建立消息消費者來接收消息。1,建立項目加入maven依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.activemq</groupId> <artifactId>activemq</artifactId> <version>1.0-SNAPSHOT</version> <!--activemq須要的jar包 不是使用最新版本的。有BUG --> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.5</version> </dependency> <!--下面是log4等通用配置 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> </dependency> </dependencies> </project>
2,生產者
第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。第二步:使用ConnectionFactory對象建立一個Connection對象。第三步:開啓鏈接,調用Connection對象的start方法。第四步:使用Connection對象建立一個Session對象。第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。第六步:使用Session對象建立一個Producer對象。第七步:建立一個Message對象,建立一個TextMessage對象。第八步:使用Producer對象發送消息。第九步:關閉資源。package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by Administrator on 2019-10-26.
*/
public class ActiveMq {
private static final String QUEUE = "queue";
private static final String URL = "tcp://47.110.76.75:61616";
public static void main(String[] args) throws Exception{
//第一步:建立ActiveMQConnectionFactory對象,須要指定服務端IP以及端口號.//brokerURL服務器得IP以端口號。
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//第二步:使用connectionFactory建立一個connection對象
Connection connection = connectionFactory.createConnection();
//第三步:開啓鏈接,調用connection對象start得方法
connection.start();
//第四步:使用Connection對象建立一個session對象
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//第五步:使用Session對象建立Destination對象(topic、queue),此處建立一個Queue對象。//參數:隊列得名稱。
Queue queue = session.createQueue(QUEUE);
//第六步:使用session對象建立一個Producer對象
MessageProducer producer = session.createProducer(queue);
//第七步:建立一個Message = new ActiveMQTextMessage();
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test");
//第八步:
producer.send(textMessage);
//第九步
producer.close();
session.close();
connection.close();
System.out.println("生產者向MQ發送消息成功!");
}
}生產完成以後能夠查看有消息生成
3,消費者
消費者有兩種消費方法:apache
一、同步消費。經過調用消費者receive方法從目的地中顯示提取消息,receive方法能夠一直阻塞到消息到達。vim
二、異步消費。客戶能夠爲消費者註冊一個消息監聽器,以定義在消息到達時所採起得動做。後端
實現MessageListener接口,在MessageListener()方法中實現消息得處理邏輯。api
4,同步消費者【通常不推薦】
第一步:建立一個鏈接工廠服務器
第二步:建立一個鏈接微信
第三步:打開鏈接session
第四步:建立會話架構
第五步:建立目的地
第六步:建立消費者
第七步:接收消息
第八步:關閉資源
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by Administrator on 2019-10-26.
*/
public class ActiveMq {
private static final String QUEUE = "queue";
private static final String URL = "tcp://47.110.76.75:61616";
public static void main(String[] args) throws Exception{
//第一步:建立ActiveMQConnectionFactory對象,須要指定服務端IP以及端口號.//brokerURL服務器得IP以端口號。
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//第二步:使用connectionFactory建立一個connection對象
Connection connection = connectionFactory.createConnection();
//第三步:開啓鏈接,調用connection對象start得方法
connection.start();
//第四步:使用Connection對象建立一個session對象
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//第五步:使用Session對象建立Destination對象(topic、queue),此處建立一個Queue對象。//參數:隊列得名稱。
Queue queue = session.createQueue(QUEUE);
//第六步:使用session對象建立一個Producer對象
MessageProducer producer = session.createProducer(queue);
//第七步:建立一個Message = new ActiveMQTextMessage();
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test");
//第八步:
producer.send(textMessage);
//第九步
producer.close();
session.close();
connection.close();
System.out.println("生產者向MQ發送消息成功!");
}
}
4.1,receive方法說明
receive() 一直阻塞
receive(1000) 10秒類沒接收消息就放棄
5,異步消費者【推薦】
第一步:建立一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中得到一個Connection對象。
第三步:開啓鏈接,調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象。和發送端保持一致queue,而且隊列的名稱一致。
第六步:使用Session對象建立一個Consumer對象。
第七步:接收消息。
第八步:打印消息。
第九步:關閉資源。
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* Created by Administrator on 2019-10-26.
*/
public class TestQueueAyncConsumer {
private static final String QUEUE = "queue";
private static final String URL = "tcp://47.110.76.75:61616";
public static void main(String[] args) throws JMSException, IOException {
//第一步:建立一個ConnectionFactory對象。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//第二步:從ConnectionFactory對象獲取一個connection對象
Connection connection = connectionFactory.createConnection();
//第三步:開啓鏈接
connection.start();
//第四步:使用connection對象建立一個session對象
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//第五步:使用Session對象建立一個Destination對象,和發送端保持同樣queue,而且隊列和名稱一致
Queue queue = session.createQueue(QUEUE);
//第六步:使用Session對象建立一個Consumer對象
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
//獲取消息內容
String text = textMessage.getText();
//打印消息
System.out.println("消息接收到消息:"+text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待鍵盤輸入目的
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
55.1 MessageListener接口說明
一個消費的監聽器,當有消息傳達時會回調裏面的OnMessage方法
在測試的時候不能讓應用程序結束,因此在加System.in.read();
總結特色
1.每個消息只能有一個消費者,相似於1對1的關係。比如我的快遞本身領取本身的。
2.消息的生產者和消費者之間沒有時間上的相關性。不管消費者在生產者發送消息的時候是否處於運行狀態,消費者均可以提取消息,比如咱們發送短消息,發送短信,發送者發送以後不見得接收者會當即接受,如關機。
3.消息被消費後隊列中不會再存儲,因此消費者不會消費到已經被消費掉的消息。
1,消費狀況1
先生產,只啓用1號消費
問題:1號消費者消費到消費嗎?
2,消費狀況2
先生產,先啓動1號消費者再啓動2號消費者
3,消費狀況3
先啓動2個消費者,再生產10條消息
1,建立生成者Producer
第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。
第二步:使用ConnectionFactory對象建立一個Connection對象。
第三步:開啓鏈接,調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Topic對象。
第六步:使用Session對象建立一個Producer對象。
第七步:建立一個Message對象,建立一個TextMessage對象。
第八步:使用Producer對象發送消息。
第九步:關閉資源。
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by Administrator on 2019-10-27.
*/
public class TestMessageTopicProducer {
private static final String QUEUE = "test-topic";
private static final String URL = "tcp://47.110.76.75:61616";
public static void main(String[] args) throws Exception {
//第一步:建立ActiveMQConnectionFactory對象,須要指定服務端ip及端口號, // brokerurl服務器的IP及端口號
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//第二步:使用ConnectionFactory對象建立一個Connection對象。
Connection connection = connectionFactory.createConnection();
//第三步:開啓鏈接
connection.start();
//1.第一個參數:是否開啓事務.true :開啓事務,第二個參數忽略
//2.第二個參數:當第一個參數爲false時,纔有意義。消息的應答模式。一、自動應答 2,手動應答.通常是自動應答
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//第五步:使用session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。//參數:隊列的名稱。
Topic topic = session.createTopic(QUEUE);
//第六步:使用Session對象建立一個producer對象。
MessageProducer producer = session.createProducer(topic);
//第七步:建立Message對象.建立一個TextMessage對象。
for (int i = 1; i < 5 ; i++){
TextMessage textMessage = session.createTextMessage("Topic mag....."+i);
producer.send(textMessage);
}
//第九步:關閉資源
producer.close();
session.close();
connection.close();
System.out.println("生產者向MQ發送成功.............");
}
}2,建立消費者 Consumer
第一步:建立一個ConnectionFactory對象。第二步:從ConnectionFactory對象中得到一個Connection對象。第三步:開啓鏈接。調用Connection對象的start方法。第四步:使用Connection對象建立一個Session對象。第五步:使用Session對象建立一個Destination對象。和發送端保持一致topic,而且話題的名稱一致。第六步:使用Session對象建立一個Consumer對象。第七步:接收消息。第八步:打印消息。第九步:關閉資源package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by Administrator on 2019-10-27.
*/
public class TestTopicAyncConsumer {
private static final String QUEUE = "test-topic";
private static final String URL = "tcp://47.110.76.75:61616";
public static void main(String[] args) throws Exception {
//第一步:建立一個connectionFactory對象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//第二步:從ConnectionFactory對象中獲取一個Connection的對象
Connection connection = connectionFactory.createConnection();
//第三步:
connection.start();
//第四步:使用Session對象建立一個session對象
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//第五步:使用Session對象建立一個Destination對象.和發送端保持一致的queue,而且隊列的名稱一致。
Topic topic = session.createTopic(QUEUE);
//第六步:使用session對象建立一個Consumer對象
MessageConsumer consumer = session.createConsumer(topic);
//第七步:接收消息.
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String msg = textMessage.getText();
//打印消息
System.out.println("接收者收到消息:"+msg);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待鍵盤輸入
System.in.read();
consumer.close();
session.close();
connection.close();
}
}先啓動消費者再啓動生產者,要否則發送的廢消息如你不關注訂閱號就接收不到消息
1,JMS是什麼
JMS的全稱是Java Message Service ,即Java消息服務。用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。
它是JAVAEE技術體系中的一個消息傳遞服務中間件,而MQ是JMS的落得產品如下是JAVAEE技術體系
2.什麼是消息服務
JAVA消息服務指的是連個程序之間進行異步通訊的API,它爲標準消息協議和消息服務提供了一組通用接口,包括建立,發送,讀取消息等等。
用於支持JAVA應用程序開發。在JAVAEE中,當兩個應用程序使用JMS進行通訊的時候,它們之間並非直接相互的,而是經過一個共同的消息收發消息服務組件關聯起來以達到解耦,異步,肖鋒的效果。
JMS相關落地的產品及比較
JMS組成的特色
1.JMS Provider 做用:實現JMS接口規範的消息中間件,也就是咱們學習的MQ
2.JMS Producer做用:消息生產者,建立與發送JMS消息的客戶端應用。
3.JMS Consumer做用:消息的消費者,接受與處理JMS消息的客戶端應用。
4.JMS Messager做用:消息的載體.
JMS Messager詳解
消息頭
JMSDesination:消息發送目的地,指的是Queue和Topic
JMSDeliveyMode:消息的持久模式和非持久模式。
一條持久性的消息應該被傳送"僅僅一次' 這就意味着若是JMS提供者出現故障,該消息並不會丟失,它會在服務器恢復以後再次鏈接。
一條持久性的消息:最多會傳送一次,這意味着服務器若是出現故障,該消息將永遠消失
JMSExpiration:消息過時設置,默認永遠不過時,
消息過時時間,等於Destination的send方法的timeToLive值加上發送時間點的GMT時間值。
若是timeToLive的值等於0,則消永不過時。
若是發送後,在消息過時時間以後消息尚未被髮送到目的地。則該消息被清除。
若是timeToLive的值等於0,則消永不過時。
若是發送後,在消息過時時間以後消息尚未被髮送到目的地,則該消息被清除
JMSPriority:消息優先級,從0-9十個級別,0-4是普通消息,5-9是加急消息。
JMS不要求MQ嚴格按照這十個優先級發送消息,但必須保證加急消息要先於普通消息到達的。默認4級。
JMSMessageID: 惟一標識每個消息的表示,由MQ產生。
消息體:消息的載體
StreamMessage Java原始值的數據流
MapMessage 一套名稱-值對
TextMessage 一個字符串對象
ObjectMessage 一個序列化的 Java對象
BytesMessage 一個字節的數據流
注意:發送和接收消息必須是一一對應的
消息屬性
消息屬性是一種增強型的API
若是須要使用消息頭髮外的值,那麼可使用消息屬性
用於識別、去重、重點標註等等操做它們是屬性名屬性值對應的形式制定的。能夠把屬性看着消息頭的擴展,屬性指定一些消息頭沒有包括的附加信息,好比能夠在屬性裏指定消息選擇器消息的屬性就像能夠分配給一個消息的附加消息頭同樣,它們容許開發者添加有關消息的不透明的附加信息,它們還用於暴露消息選擇器在消息過濾時使用的數據。TextMessage message=session.createTextMessage();
message.setText(text);
message.setStringProperty("username","小明");//自定義屬性
1,參數配置說明
1.1,持久設置
1,準備工做【建立項目加入依賴】