ActiveMQ

1、什麼是消息中間件(MQ)html

1.1 爲何會須要消息隊列(MQ)?java

  主要緣由是因爲在高併發環境下,因爲來不及同步處理,請求每每會發生堵塞,好比說,大量的insert,update之類的請求同時到達MySQL,直接致使無數的行鎖表鎖,甚至最後請求會堆積過多,從而觸發too many connections錯誤。經過使用消息隊列,咱們能夠異步處理請求,從而緩解系統的壓力。數據庫

2.2 什麼是消息中間件macos

  消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。對於消息中間件,常見的角色大體也就有Producer(生產者)、Consumer(消費者)
  常見的消息中間件產品:
  (1)ActiveMQ
  ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。咱們在本次課程中介紹 ActiveMQ的使用。
  (2)RabbitMQ
  AMQP協議的領導實現,支持多種場景。淘寶的MySQL集羣內部有使用它進行通信,OpenStack開源雲平臺的通訊組件,最早在金融行業獲得運用。
  (3)ZeroMQ
  史上最快的消息隊列系統
  (4)Kafka
  Apache下的一個子項目 。特色:高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統。適合處理海量數據。apache

2、JMS簡介服務器

2.1 什麼是JMS  session

  JMS(Java Messaging Service)是Java平臺上有關面向消息中間件的技術規範,它便於消息系統中的Java應用程序進行消息交換,而且經過提供標準的產生、發送、接收消息的接口簡化企業應用的開發。併發

       JMS自己只定義了一系列的接口規範,是一種與廠商無關的 API,用來訪問消息收發系統。它相似於 JDBC(java Database Connectivity):這裏,JDBC 是能夠用來訪問許多不一樣關係數據庫的 API,而 JMS 則提供一樣與廠商無關的訪問方法,以訪問消息收發服務。許多廠商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,這只是幾個例子。 JMS 使您可以經過消息收發服務(有時稱爲消息中介程序或路由器)從一個 JMS 客戶機向另外一個 JML 客戶機發送消息。消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶着應用程序的數據或有效負載。
  JMS 定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。異步

· TextMessage--一個字符串對象
· MapMessage--一套名稱-值對
· ObjectMessage--一個序列化的 Java 對象
· BytesMessage--一個字節的數據流
· StreamMessage -- Java 原始值的數據流tcp

2.2 JMS 消息傳遞類型

  對於消息的傳遞有兩種類型:

  一種是點對點的,即一個生產者和一個消費者一一對應:

  另外一種是發佈/ 訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收:

  

 3、ActiveMQ下載與安裝

3.1 下載

  官方下載地址:http://activemq.apache.org/activemq-5153-release.html

3.2 安裝(OSX,Linux類同)

  解壓下載文件,apache-activemq-5.12.0-bin.tar.gz

tar zxvf apache-activemq-5.12.0-bin.tar.gz

  爲apache-activemq-5.12.0目錄賦權

chmod 777 apache-activemq-5.12.0

3.3 啓動、訪問與關閉

#切換至安裝目錄macosx下

macosx Mac$ pwd

/Users/Mac/JavaUtils/apache-activemq-5.15.3/bin/macosx

#啓動activemq服務

macosx Mac$ ./activemq start

Starting ActiveMQ Broker...

#關閉activemq服務

:macosx Mac$ ./activemq stop

Stopping ActiveMQ Broker...

Stopped ActiveMQ Broker.

:macosx Mac$ 

 4、JMS入門

 4.1 點對點模式

一個生成者產生一個消息 只能被被一個消費者消費,消費完,消息就沒有了。

 4.1.1 消息生產者

(1)建立工程,引入依賴

<dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-client</artifactId>
        <version>5.13.4</version>
</dependency>

(2)建立生產者

 1 public class QueueProducer {
 2 
 3     public static void main(String[] args) throws JMSException {
 4         //1.建立鏈接工廠
 5         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
 6         //2.獲取鏈接
 7         Connection connection = connectionFactory.createConnection();
 8         //3.啓動鏈接
 9         connection.start();
10         /*4.獲取session  (參數1:是否啓動事務,
11          參數2:消息確認模式[
12          AUTO_ACKNOWLEDGE = 1    自動確認
13          CLIENT_ACKNOWLEDGE = 2    客戶端手動確認   
14          DUPS_OK_ACKNOWLEDGE = 3    自動批量確認
15          SESSION_TRANSACTED = 0    事務提交併確認
16         ])*/
17         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
18         //5.建立隊列對象
19         Queue queue = session.createQueue("test-queue");
20         //6.建立消息生產者
21         MessageProducer producer = session.createProducer(queue);
22         //7.建立消息
23         TextMessage textMessage = session.createTextMessage("歡迎來到MQ世界");
24         //8.發送消息
25         producer.send(textMessage);
26         //9.關閉資源
27         producer.close();
28         session.close();
29         connection.close();
30     }
31 
32 }

(3)運行經過界面查看

4.1.2 消息消費者

(1)建立消息消費者

 1 public class QueueConsumer {
 2     public static void main(String[] args) throws JMSException, IOException {
 3         //1.建立鏈接工廠
 4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
 5         //2.獲取鏈接
 6         Connection connection = connectionFactory.createConnection();
 7         //3.啓動鏈接
 8         connection.start();
 9         //4.獲取session  (參數1:是否啓動事務,參數2:消息確認模式)
10         Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
11         //5.建立隊列對象
12         Queue queue = session.createQueue("test-queue");
13         //6.建立消息消費者
14         MessageConsumer consumer = session.createConsumer(queue);
15         //7.監聽消息
16         consumer.setMessageListener(new MessageListener() {
17             @Override
18             public void onMessage(Message message) {
19                 TextMessage textMessage = (TextMessage) message;
20                 try {
21                     System.out.println("接收到消息:"+textMessage.getText());
22                 } catch (JMSException e) {
23                     e.printStackTrace();
24                 }
25             }
26         });
27         //8.等待鍵盤輸入
28         System.in.read();
29         //9.關閉資源
30         consumer.close();
31         session.close();
32         connection.close();
33     }
34 
35 }

(2)運行查看控制檯輸出與經過界面查看

 4.2 發佈/訂閱模式 

4.2.1 消息生產者

 1 public class TopicProducer {
 2 
 3     public static void main(String[] args) throws JMSException {
 4         //1.建立鏈接工廠
 5         ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
 6         //2.獲取鏈接
 7         Connection connection = connectionFactory.createConnection();
 8         //3.啓動鏈接
 9         connection.start();
10         /*4.獲取session  (參數1:是否啓動事務,
11          參數2:消息確認模式)*/
12         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
13         //5.建立主題對象
14         Topic topic = session.createTopic("test-topic"); 15         //6.建立消息生產者
16         MessageProducer producer = session.createProducer(topic); 17         //7.建立消息
18         TextMessage textMessage = session.createTextMessage("歡迎來到MQ世界!");
19         //8.發送消息
20         producer.send(textMessage);
21         //9.關閉資源
22         producer.close();
23         session.close();
24         connection.close();
25     }
26 
27 }

4.2.2 消息2個消費者(消費者2代碼同消費者1)

 1 public class TopicConsumer1 {
 2     public static void main(String[] args) throws JMSException, IOException {
 3         //1.建立鏈接工廠
 4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
 5         //2.獲取鏈接
 6         Connection connection = connectionFactory.createConnection();
 7         //3.啓動鏈接
 8         connection.start();
 9         //4.獲取session  (參數1:是否啓動事務,參數2:消息確認模式)
10         Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
11         //5.建立主題對象
12         Topic topic = session.createTopic("test-topic");
13         //6.建立消息消費者
14         MessageConsumer consumer = session.createConsumer(topic);
15         //7.監聽消息
16         consumer.setMessageListener(new MessageListener() {
17             @Override
18             public void onMessage(Message message) {
19                 TextMessage textMessage = (TextMessage) message;
20                 try {
21                     System.out.println("消費1--接收到消息:"+textMessage.getText());
22                 } catch (JMSException e) {
23                     e.printStackTrace();
24                 }
25             }
26         });
27         //8.等待鍵盤輸入
28         System.in.read();
29         //9.關閉資源
30         consumer.close();
31         session.close();
32         connection.close();
33     }
34 
35 }

4.2.3 運行查看測試結果

同時開啓2個以上的消費者,再次運行生產者,觀察每一個消費者控制檯的輸出,會發現每一個消費者會接收到消息。

4.3 總結

發佈訂閱的模式 默認的請狀況下:消息的內容不存在服務器,當生產者發送了一個消息,若是消費者以前沒有訂閱,就沒了。
點對點的方式:默認的請狀況下:將消息存儲在服務器上,消費者隨時來取,可是一旦一個消費者獲取到了消息,這個消息就沒有了。
相關文章
相關標籤/搜索