【學習】022 ActiveMQ

 

1、消息中間件概述

1.1消息中間件產生的背景

在客戶端與服務器進行通信時.客戶端調用後,必須等待服務對象完成處理返回結果才能繼續執行。java

 客戶與服務器對象的生命週期緊密耦合,客戶進程和服務對象進程都都必須正常運行;若是因爲服務對象崩潰或者網絡故障致使用戶的請求不可達,客戶會受到異常web

點對點通訊: 客戶的一次調用只發送給某個單獨的目標對象。spring

1.2 什麼是消息中間件

面向消息的中間件(MessageOrlented MiddlewareMOM)較好的解決了以上問
題。發送者將消息發送給消息服務器,消息服務器將消感存放在若千隊列中,在合適
的時候再將消息轉發給接收者。數據庫

這種模式下,發送和接收是異步的,發送者無需等
待; 兩者的生命週期未必相同: 發送消息的時候接收者不必定運行,接收消息的時候
發送者也不必定運行;一對多通訊: 對於一個消息能夠有多個接收者。apache

二 、JMS介紹

2.1 什麼是JMS?

JMS是java的消息服務,JMS的客戶端之間能夠經過JMS服務進行異步的消息傳輸。windows

2.2 什麼是消息模型

○ Point-to-Point(P2P) --- 點對點瀏覽器

○ Publish/Subscribe(Pub/Sub)---  發佈訂閱springboot

即:點對點發佈訂閱模型服務器

 

2.2.1 P2P (點對點)

P2P網絡

  1. P2P模式圖 

  2. 涉及到的概念 
    1. 消息隊列(Queue)
    2. 發送者(Sender)
    3. 接收者(Receiver)
    4. 每一個消息都被髮送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留着消息,直到他們被消費或超時。
  3. P2P的特色
    1. 每一個消息只有一個消費者(Consumer)(即一旦被消費,消息就再也不在消息隊列中)
    2. 發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息以後,無論接收者有沒有正在運行,它不會影響到消息被髮送到隊列
    3. 接收者在成功接收消息以後需向隊列應答成功

若是你但願發送的每一個消息都應該被成功處理的話,那麼你須要P2P模式。

應用場景

A用戶與B用戶發送消息

 

2.2.2Pub/Sub (發佈與訂閱)

Pub/Sub模式圖 

 

涉及到的概念 :

主題(Topic)

發佈者(Publisher)

訂閱者(Subscriber) 
客戶端將消息發送到主題。多個發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。

Pub/Sub的特色

每一個消息能夠有多個消費者

發佈者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者以後,才能消費發佈者的消息,並且爲了消費消息,訂閱者必須保持運行的狀態。

爲了緩和這樣嚴格的時間相關性,JMS容許訂閱者建立一個可持久化的訂閱。這樣,即便訂閱者沒有被激活(運行),它也能接收到發佈者的消息。

若是你但願發送的消息能夠不被作任何處理、或者被一個消息者處理、或者能夠被多個消費者處理的話,那麼能夠採用Pub/Sub模型

消息的消費 
在JMS中,消息的產生和消息是異步的。對於消費來講,JMS的消息者能夠經過兩種方式來消費消息。 
○ 同步 
訂閱者或接收者調用receive方法來接收消息,receive方法在可以接收到消息以前(或超時以前)將一直阻塞 
○ 異步 
訂閱者或接收者能夠註冊爲一個消息監聽器。當消息到達以後,系統自動調用監聽器的onMessage方法。

  應用場景:

   用戶註冊、訂單修改庫存、日誌存儲

   畫圖演示

 

 MQ產品的分類

 

RabbitMQ

是使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它變的很是重量級,更適合於企業級的開發。同時實現了一個經紀人(Broker)構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)或者數據持久化都有很好的支持。

Redis

是一個Key-Value的NoSQL數據庫,開發維護很活躍,雖然它是一個Key-Value數據庫存儲系統,但它自己支持MQ功能,因此徹底能夠當作一個輕量級的隊列服務來使用。對於RabbitMQ和Redis的入隊和出隊操做,各執行100萬次,每10萬次記錄一次執行時間。測試數據分爲128Bytes、512Bytes、1K和10K四個不一樣大小的數據。實驗代表:入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而若是數據大小超過了10K,Redis則慢的沒法忍受;出隊時,不管數據大小,Redis都表現出很是好的性能,而RabbitMQ的出隊性能則遠低於Redis。

 

入隊

出隊

 

128B

512B

1K

10K

128B

512B

1K

10K

Redis

16088

15961

17094

25

15955

20449

18098

9355

RabbitMQ

10627

9916

9370

2366

3219

3174

2982

1588

 

ZeroMQ

號稱最快的消息隊列系統,尤爲針對大吞吐量的需求場景。ZMQ可以實現RabbitMQ不擅長的高級/複雜的隊列,可是開發人員須要本身組合多種技術框架,技術上的複雜度是對這MQ可以應用成功的挑戰。ZeroMQ具備一個獨特的非中間件的模式,你不須要安裝和運行一個消息服務器或中間件,由於你的應用程序將扮演了這個服務角色。你只須要簡單的引用ZeroMQ程序庫,可使用NuGet安裝,而後你就能夠愉快的在應用程序之間發送消息了。可是ZeroMQ僅提供非持久性的隊列,也就是說若是down機,數據將會丟失。其中,Twitter的Storm中使用ZeroMQ做爲數據流的傳輸。

ActiveMQ

是Apache下的一個子項目。 相似於ZeroMQ,它可以以代理人和點對點的技術實現隊列。同時相似於RabbitMQ,它少許代碼就能夠高效地實現高級應用場景。RabbitMQ、ZeroMQ、ActiveMQ均支持經常使用的多種語言客戶端 C++、Java、.Net,、Python、 Php、 Ruby等。

Jafka/Kafka

Kafka是Apache下的一個子項目,是一個高性能跨語言分佈式Publish/Subscribe消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具備如下特性:快速持久化,能夠在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現複雜均衡;支持Hadoop數據並行加載,對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka經過Hadoop的並行加載機制來統一了在線和離線的消息處理,這一點也是本課題所研究系統所看重的。Apache Kafka相對於ActiveMQ是一個很是輕量級的消息系統,除了性能很是好以外,仍是一個工做良好的分佈式系統。

其餘一些隊列列表HornetQ、Apache Qpid、Sparrow、Starling、Kestrel、Beanstalkd、Amazon SQS就再也不一一分析。

4、 ActiveMQ使用

4.1 、window下 ActiveMQ安裝

ActiveMQ部署其實很簡單,和全部Java同樣,要跑java程序就必須先安裝JDK並配置好環境變量,這個很簡單。

而後解壓下載的apache-activemq-5.10-20140603.133406-78-bin.zip壓縮包到一個目錄,獲得解壓後的目錄結構以下圖:

進入bin目錄,發現有win32和win64兩個文件夾,這2個文件夾分別對應windows32位和windows64位操做系統的啓動腳本。

個人實驗環境是windowsXP,就進入win32目錄,會看到以下目錄結構。

其中activemq.bat即是啓動腳本,雙擊啓動。

ActiveMQ默認啓動到8161端口,啓動完了後在瀏覽器地址欄輸入:http://localhost:8161/admin要求輸入用戶名密碼,默認用戶名密碼爲admin、admin,這個用戶名密碼是在conf/users.properties中配置的。輸入用戶名密碼後即可看到以下圖的ActiveMQ控制檯界面了。

4.1.1控制檯介紹

Number Of Consumers  消費者 這個是消費者端的消費者數量 
Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。能夠理解爲總接收數-總出隊列數 
Messages Enqueued 進入隊列的消息  進入隊列的總數量,包括出隊列的。 這個數量只增不減 
Messages Dequeued 出了隊列的消息  能夠理解爲是消費這消費掉的數量 
這個要分兩種狀況理解 
在queues裏它和進入隊列的總數量相等(由於一個消息只會被成功消費一次),若是暫時不等是由於消費者還沒來得及消費。 
在 topics裏 它由於多消費者從而致使數量會比入隊列數高。 
簡單的理解上面的意思就是 
當有一個消息進入這個隊列時,等待消費的消息是1,進入隊列的消息是1。 
當消息消費後,等待消費的消息是0,進入隊列的消息是1,出隊列的消息是1。
在來一條消息時,等待消費的消息是1,進入隊列的消息就是2。
沒有消費者時  Pending Messages   和 入隊列數量同樣 
有消費者消費的時候 Pedding會減小 出隊列會增長 
到最後 就是 入隊列和出隊列的數量同樣多 
以此類推,進入隊列的消息和出隊列的消息是池子,等待消費的消息是水流。 

4.2 、實現點對點通信模式

 使用ActiveMQ完成點對點(p2p)通信模式

引入pom文件jar包依賴

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>

生產者

 

package com.hongmoshui;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producter
{
    public static void main(String[] args) throws JMSException
    {
        // ConnectionFactory :鏈接工廠,JMS
        // 用它建立鏈接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://127.0.0.1:61616");
        // JMS 客戶端到JMS Provider 的鏈接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Session: 一個發送或接收消息的線程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // Destination :消息的目的地;消息發送給誰.
        // 獲取session注意參數值my-queue是Query的名字
        Destination destination = session.createQueue("my-queue");
        // MessageProducer:消息生產者
        MessageProducer producer = session.createProducer(destination);
        // 設置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        // 發送一條消息
        for (int i = 1; i <= 5; i++)
        {
            sendMsg(session, producer, i);
        }
        connection.close();
    }

    /**
     * 在指定的會話上,經過指定的消息生產者發出一條消息
     * 
     * @param session 消息會話
     * @param producer 消息生產者
     */
    public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException
    {
        // 建立一條文本消息
        TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
        // 經過消息生產者發出消息
        producer.send(message);
    }
}

消費者

package com.hongmoshui;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsReceiver
{
    public static void main(String[] args) throws JMSException
    {
        // ConnectionFactory :鏈接工廠,JMS
        // 用它建立鏈接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://127.0.0.1:61616");
        // JMS 客戶端到JMS Provider 的鏈接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Session: 一個發送或接收消息的線程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // Destination :消息的目的地;消息發送給誰.
        // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置
        Destination destination = session.createQueue("my-queue");
        // 消費者,消息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        while (true)
        {
            TextMessage message = (TextMessage) consumer.receive();
            if (null != message)
            {
                System.out.println("收到消息:" + message.getText());
            }
            else
                break;
        }
        session.close();
        connection.close();
    }
}

注:activeMQ的管理臺端口號默認爲8161,瀏覽器輸入http://127.0.0.1:8161,帳號:admin,密碼:admin

 

Number Of Consumers  消費者 這個是消費者端的消費者數量 
Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。能夠理解爲總接收數-總出隊列數 
Messages Enqueued 進入隊列的消息  進入隊列的總數量,包括出隊列的。 這個數量只增不減 
Messages Dequeued 出了隊列的消息  能夠理解爲是消費這消費掉的數量 

4.3 、JMS消息可靠機制

ActiveMQ消息簽收機制:

客戶端成功接收一條消息的標誌是一條消息被簽收,成功應答。

消息的簽收情形分兩種:

一、帶事務的session

   若是session帶有事務,而且事務成功提交,則消息被自動簽收。若是事務回滾,則消息會被再次傳送。

二、不帶事務的session

   不帶事務的session的簽收方式,取決於session的配置。

   Activemq支持一下三種模式:

   Session.AUTO_ACKNOWLEDGE  消息自動簽收

   Session.CLIENT_ACKNOWLEDGE  客戶端調用acknowledge方法手動簽收

textMessage.acknowledge();//手動簽收

   Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,消息可能會重複發送。在第二次從新傳送消息的時候,消息

只有在被確認以後,才認爲已經被成功地消費了。消息的成功消費一般包含三個階段:客戶接收消息、客戶處理消息和消息被確認。 在事務性會話中,當一個事務被提交的時候,確認自動發生。在非事務性會話中,消息什麼時候被確認取決於建立會話時的應答模式(acknowledgement mode)。

4.4 、發佈訂閱

生產者:

package com.hongmoshui;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TOPSend
{

    private static String BROKERURL = "tcp://127.0.0.1:61616";

    private static String TOPIC = "my-topic";

    public static void main(String[] args) throws JMSException
    {
        start();
    }

    static public void start() throws JMSException
    {
        System.out.println("生產者已經啓動....");
        // 建立ActiveMQConnectionFactory
        // 會話工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        // 啓動JMS 鏈接
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        send(producer, session);
        System.out.println("發送成功!");
        connection.close();
    }

    static public void send(MessageProducer producer, Session session) throws JMSException
    {
        for (int i = 1; i <= 5; i++)
        {
            System.out.println("我是消息" + i);
            TextMessage textMessage = session.createTextMessage("我是消息" + i);
            Destination destination = session.createTopic(TOPIC);
            producer.send(destination, textMessage);
        }
    }

}

消費者:

 

package com.hongmoshui;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TOPReceiver
{
    private static String BROKERURL = "tcp://127.0.0.1:61616";

    private static String TOPIC = "my-topic";

    public static void main(String[] args) throws JMSException
    {
        start();
    }

    static public void start() throws JMSException
    {
        System.out.println("消費點啓動...");
        // 建立ActiveMQConnectionFactory
        // 會話工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        // 啓動JMS 鏈接
        connection.start();
        // 不開消息啓事物,消息主要發送消費者,則表示消息已經簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 建立一個隊列
        Topic topic = session.createTopic(TOPIC);
        MessageConsumer consumer = session.createConsumer(topic);
        // consumer.setMessageListener(new
        // MsgListener());
        while (true)
        {
            TextMessage textMessage = (TextMessage) consumer.receive();
            if (textMessage != null)
            {
                System.out.println("接受到消息:" + textMessage.getText());
                // textMessage.acknowledge();//
                // 手動簽收
                // session.commit();
            }
            else
            {
                break;
            }
        }
        connection.close();
    }

}

4.5 、SpringBoot整合ActiveMQ

生產者:

4.5.1 引入 maven依賴

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.4.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- spring boot web支持:mvc,aop... -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

4.5.2 引入 application.yml配置

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
queue: springboot-queue
server:
  port: 8080

4.5.3 建立QueueConfig

package com.hongmoshui.config;
import javax.jms.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig
{
    @Value("${queue}")
    private String queue;

    @Bean
    public Queue logQueue()
    {
        return new ActiveMQQueue(queue);
    }
}

4.5.4 建立Producer

package com.hongmoshui.producer;
import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
@EnableScheduling
public class Producer
{
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    @Scheduled(fixedDelay = 5000)
    public void send()
    {
        jmsMessagingTemplate.convertAndSend(queue, "測試消息隊列" + System.currentTimeMillis());
    }
}

4.5.5 啓動

package com.hongmoshui;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class MQProducerStartApp
{
    public static void main(String[] args)
    {
        SpringApplication.run(MQProducerStartApp.class, args);
    }
}

消費者:

4.5.1 yml文件配置修改,端口號改爲8081

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
queue: springboot-queue
server:
  port: 8081

4.5.1 建立Consumer

package com.hongmoshui.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer
{

    @JmsListener(destination = "${queue}")
    public void receive(String msg)
    {
        System.out.println("監聽器收到msg:" + msg);
    }

}

4.5.1 啓動

package com.hongmoshui;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MQConSumerStartApp
{
    public static void main(String[] args)
    {
        SpringApplication.run(MQConSumerStartApp.class, args);
    }
}

使用消息中間注意事項

  1. 消費者代碼不要拋出異常,不然activqmq默認有重試機制。
  2. 若是代碼發生異常,須要發佈版本才能夠解決的問題,不要使用重試機制,採用日誌記錄方式,定時Job進行補償。
  3. 若是不須要發佈版本解決的問題,能夠採用重試機制進行補償。

消費者若是保證消息冪等性,不被重複消費。

產生緣由:網絡延遲傳輸中,會形成進行MQ重試中,在重試過程當中,可能會形成重複消費。

解決辦法:

1.使用全局MessageID 判斷消費方使用同一個,解決冪等性。

2.使用JMS可靠消息機制

相關文章
相關標籤/搜索