初識消息中間件之 ==> ActiveMQ

1、消息隊列概述java

  消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。web

  消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。spring

  消息隊列中間件是分佈式系統中重要的組件,主要解決異步消息,流量削峯,應用耦合等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分佈式系統不可缺乏的中間件。目前使用較多的消息隊列產品有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。數據庫

生活中的例子apache

老式餐廳點餐後需呆在座位上等餐,中途不能離開去幹別的事,若是離開去幹別的事,餐好了,點餐人殊不知道。瀏覽器

新式餐廳點餐後,餐廳會提供一個「電子盤」給顧客,顧客能夠不用在店裏等餐,能夠去附近逛逛,買買東西,等餐好了,手上的「電子盤」就會響,通知顧客能夠回去就餐了。springboot

對比以上兩種形式,第二種情形就像消息隊列同樣,點完餐之後就能夠去處理別的事情,不用一直在餐廳等着。網絡

2、消息隊列的做用session

上面說了消息隊列主要解決了異步處理,流量削峯,應用耦合等三個方面的問題。架構

異步處理

場景說明:用戶註冊後,系統要發送註冊郵件和註冊短信。傳統的方式有兩種,串行模式和並行方式 。

串行模式:將註冊信息存入數據庫成功後,先發送註冊郵件再發送註冊短信,以上三個步驟都完成後,將成功的信息返回給客戶端。

並行模式:將註冊信息存入數據庫成功後,發送郵件的同時發送註冊短信,以上三個任務都完成後返回給客戶端,與串行模式的差異是並行模式能夠提升處理的時間。

假設每一個業務結點的處理時間爲50ms,不考慮網絡開銷,則串行模式的時間爲150ms,並行模式的時間爲100ms。

若是引入消息隊列,可以大大縮短響應時間,以下:

用戶註冊信息寫入數據庫後,再將發送郵件和短信寫入消息隊列,而後直接返回註冊結果,總共耗時55m,是並行的一半左右,是串行的三分之一左右,大大提升了系統的處理能力。

應用解耦

場景說明:用戶下單後,訂單系統須要通知庫存系統。傳統的作法是,訂單系統調用庫存系統的接口,如圖所示:

傳統模式的缺點:

  • 假如庫存系統沒法訪問,則訂單減庫存將失敗,從而致使下單失敗;
  • 訂單系統與庫存系統耦合;

引入消息隊列的方案以下:

  • 訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。
  • 庫存系統:訂閱下單的消息,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操做。
  • 假如:在下單時庫存系統不能正常使用。也不影響正常下單,由於下單後,訂單系統寫入消息隊列就再也不關心其餘的後續操做了。實現訂單系統與庫存系統的應用解耦。

流量削峯

場景說明:業務系統處理能力遠遠大於支付渠道處理能力,假如不控制流量把所有請求往支付渠道發送,支付渠道可能會掛掉,致使整個業務不能成功。

這時引入消息隊列,控制流量,讓請求有序的進入支付渠道

日誌處理

日誌處理是指將消息隊列用在日誌處理中,好比 Kafka 的應用,解決大量日誌傳輸的問題。架構簡化以下:


  • 日誌採集客戶端,負責日誌數據採集,定時寫受寫入Kafka隊列;
  • Kafka消息隊列,負責日誌數據的接收,存儲和轉發;
  • 日誌處理應用:訂閱並消費kafka隊列中的日誌數據;

3、Active MQ

下載

http://activemq.apache.org/components/classic/download/

安裝

直接解壓,而後移動到指定目錄便可。

>tar zxvf apache-activemq-5.15.10-bin.tar.gz
>mv ./apache-activemq-5.15.10 /usr/local

啓動

>/usr/local/activemq-5.15.10/bin/activemq start

# 檢查啓動狀態
[root@cbooy bin]# jps
3168 Jps
2268 activemq.jar

# activemq啓動的默認端口號 61616
[root@cbooy bin]# lsof -i:61616
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    2268 root  132u  IPv6  15719      0t0  TCP *:61616 (LISTEN)

其餘基本命令

> activemq restart # 重啓
> activemq stop    # 關閉
> activemq start > /activemq_home/logs/activemq.log   # 落地相關信息,打印日誌

指定配置文件的啓動

./bin/activemq start xbean:/usr/local/activemq-5.15.10/conf/activemq.xml

後臺圖形化界面支持

  • http://127.0.0.1:8161/admin
    • 默認用戶名/密碼, admin/admin
  • 圖形化頁面相關信息說明
    • Number Of Pending Messages
      • 等待消費的消息
      • 未出隊列的數量
    • Number Of Consumers
      • 消費者數量
    • Messages Enqueued
      • 進隊消息數,進入隊列的總數包括出隊的消息數
    • Messages Dequeued
      • 出隊消息數,即消費者消費後的消息

4、Java操做ActiveMQ

依賴 jar 包

dependencies {
    compile('org.apache.activemq:activemq-all:5.15.9')
    compile('org.apache.activemq:activemq-pool:5.15.9')
}

第一種模式:Queue

生產流程

  • 建立鏈接工廠對象
  • 從工廠中創建一個鏈接並開啓(Connection)
  • 從鏈接中創建一個會話(Session)
  • 基於會話創建目的地(Queue)
  • 基於會話建立生產者(Producer)
  • 在會話的基礎上建立一條消息(Message)
  • 生產者將消息發出
  • 資源關閉
public class Producer {

  // activemq服務的地址,默認通訊端口爲61616
  private static final String URL = "tcp://192.168.182.128:61616";

  // 定義隊列的名稱
  private static final String QUEUE_NAME = "test-Queue";

  public static void main(String[] args) {

    MessageProducer producer = null;
    Session session = null;
    Connection connection = null;

    try {
      // 建立鏈接工廠對象
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

      // 從工廠中創建一個鏈接並開啓(Connection)
      connection = connectionFactory.createConnection();
      connection.start();

      // 從鏈接中創建一個會話(Session)
      session = connection.createSession(false, 1);

      // 基於會話創建隊列(Queue)
      Queue queue = session.createQueue(QUEUE_NAME);

      // 基於會話建立生產者(Producer)
      producer = session.createProducer(queue);

      for (int i = 0; i < 10; i++) {

        // 在會話的基礎上建立一條消息(Message)
        TextMessage textMessage = session.createTextMessage("test-mq:" + i);
        // 生產者將消息發出
        producer.send(textMessage);
      }
    } catch (Exception ex) {
      throw new IllegalStateException(ex);
      // 資源關閉
    } finally {
      try {
        if (null != producer) {
          producer.close();
        }
      } catch (JMSException e) {
        e.printStackTrace();
      }
      try {
        if (null != session) {
          session.close();
        }
      } catch (JMSException e) {
        e.printStackTrace();
      }
      try {
        if (null != connection) {
          connection.close();
        }
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }
}
Producer

執行以上代碼後,咱們能夠在管理頁面上看到以下狀況:

消費流程

  • 建立鏈接工廠對象
  • 從工廠中創建一個鏈接並開啓(Connection)
  • 從鏈接中創建一個會話(Session)
  • 基於會話創建目的地(Queue)
  • 基於會話建立消費者(Consumer)
  • 消費者接收消息
  • 資源關閉
public class Consumer {

  // activemq服務地址,默認通訊端口爲61616
  private static final String URL = "tcp://192.168.182.128:61616";

  // 定義隊列的名稱
  private static final String QUEUE_NAME = "test-Queue";

  public static void main(String[] args) {

    MessageConsumer consumer = null;
    Session session = null;
    Connection connection = null;

    try {
      // 建立鏈接工廠對象
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

      // 從工廠中創建一個鏈接並開啓(Connection)
      connection = connectionFactory.createConnection();
      connection.start();

      // 從鏈接中創建一個會話(Session)
      session = connection.createSession(false, 1);

      // 基於會話創建隊列(Queue)
      Queue queue = session.createQueue(QUEUE_NAME);

      // 基於會話建立消費者(Consumer)
      consumer = session.createConsumer(queue);

      // 接收消息的第一種方式,阻塞式接收
      // Message message = consumer.receive();
      // System.out.println("consumer recive message = " + message);

      // 接收消息的第二種方式,使用監聽器
      consumer.setMessageListener(msg -> {
        TextMessage textMessage = (TextMessage) msg;
        try {
          System.out.println("textMessage = " + textMessage.getText());
        } catch (JMSException e) {
          e.printStackTrace();
        }
      });
    } catch (Exception ex) {
      throw new IllegalStateException(ex);
    }
  }
}
Consumer

執行以上代碼後,咱們能夠在管理頁面上看到以下狀況:

咱們此次先運行兩個 Consumer,因爲 Consumer 種沒有關閉資源,因此會一直保持和 ActiveMQ的鏈接。

而後再運行 Producer,咱們來看看現象:

控制檯打印的信息中,Consumer1 消費的信息都是偶數的,Consumer2 消費的信息都是奇數的,一條消息只能被一個Consumer消費。

第二種模式:Topic

生產流程

  • 建立鏈接工廠對象
  • 從工廠中創建一個鏈接並開啓(Connection)
  • 從鏈接中創建一個會話(Session)
  • 基於會話創建目的地(Topic)
  • 基於會話建立生產者(Producer)
  • 在會話的基礎上建立一條消息(Message)
  • 生產者將消息發出
  • 資源關閉
public class Producer {

  // activemq服務地址,默認通訊端口爲61616
  private static final String URL = "tcp://192.168.182.128:61616";

  // 定義隊列的名稱
  private static final String TOPIC_NAME = "test-Topic";

  public static void main(String[] args) {

    MessageProducer producer = null;
    Session session = null;
    Connection connection = null;

    try {
      // 建立鏈接工廠對象
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

      // 從工廠中創建一個鏈接並開啓(Connection)
      connection = connectionFactory.createConnection();
      connection.start();

      // 從鏈接中創建一個會話(Session)
      session = connection.createSession(false, 1);

      // 基於會話創建目的地(Topic)
      Topic topic = session.createTopic(TOPIC_NAME);

      // 基於會話建立生產者(Producer)
      producer = session.createProducer(topic);

      for (int i = 0; i < 10; i++) {

        // 在會話的基礎上建立一條消息(Message)
        TextMessage textMessage = session.createTextMessage("test-topic:" + i);
        // 生產者將消息發出
        producer.send(textMessage);
      }
    } catch (Exception ex) {
      throw new IllegalStateException(ex);
      // 資源關閉
    } finally {
      try {
        if (null != producer) {
          producer.close();
        }
      } catch (JMSException e) {
        e.printStackTrace();
      }
      try {
        if (null != session) {
          session.close();
        }
      } catch (JMSException e) {
        e.printStackTrace();
      }
      try {
        if (null != connection) {
          connection.close();
        }
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }
}
Producer

消費流程

  • 建立鏈接工廠對象
  • 從工廠中創建一個鏈接並開啓(Connection)
  • 從鏈接中創建一個會話(Session)
  • 基於會話創建目的地(Topic)
  • 基於會話建立消費者(Consumer)
  • 消費者接收消息
  • 資源關閉
public class Consumer1 {

  // activemq服務的地址,默認通訊端口爲61616
  private static final String URL = "tcp://192.168.182.128:61616";

  // 定義隊列的名稱
  private static final String TOPIC_NAME = "test-Topic";

  public static void main(String[] args) {

    MessageConsumer consumer = null;
    Session session = null;
    Connection connection = null;

    try {
      // 建立鏈接工廠對象
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

      // 從工廠中創建一個鏈接並開啓(Connection)
      connection = connectionFactory.createConnection();
      connection.start();

      // 從鏈接中創建一個會話(Session)
      session = connection.createSession(false, 1);

      // 基於會話創建目的地(Topic)
      Topic topic = session.createTopic(TOPIC_NAME);

      // 基於會話建立消費者(Consumer)
      consumer = session.createConsumer(topic);

      // 接收消息的第一種方式,阻塞式接收
      // Message message = consumer.receive();
      // System.out.println("consumer recive message = " + message);

      // 接收消息的第二種方式,使用監聽器
      consumer.setMessageListener(msg -> {
        TextMessage textMessage = (TextMessage) msg;
        try {
          System.out.println("textMessage = " + textMessage.getText());
        } catch (JMSException e) {
          e.printStackTrace();
        }
      });

    } catch (Exception ex) {
      throw new IllegalStateException(ex);
    }
  }
}
Consumer

Queue 模式和 Topic 模式,代碼十分類似,一個是建立Queue,而另一個是建立Topic。

如今咱們來運行三個 Consumer,再運行 Producer,來看看現象

控制檯打印的信息中,三個Consumer 都消費了全部消息,一條消息只能被多個 Consumer消費。

5、SpringBoot整合ActiveMQ

Queue模式

Producer端:

一、引入依賴

dependencies {
    implementation('org.springframework.boot:spring-boot-starter-web')
    implementation('org.springframework.boot:spring-boot-starter-aop')
    testImplementation('org.springframework.boot:spring-boot-starter-test')

    // 導入activemq啓動器依賴
    implementation('org.springframework.boot:spring-boot-starter-activemq')
}
引入依賴

二、新建 application.yaml 配置文件並進行基本配置

server:
  port: 8888
  servlet:
    context-path: /queue-producer

spring:
  activemq:
    broker-url: tcp://192.168.182.128:61616
application.yaml

三、建立配置類

@EnableJms
@Configuration
public class ProducerConfig {

  @Bean
  public Queue createQueue(){
    return new ActiveMQQueue("springboot-queue");
  }
}
ProducerConfig

四、建立 Producer 類

@Component
public class QueueProducer {

  @Autowired
  private Queue queue;

  @Autowired
  private JmsMessagingTemplate jmsTemplate;

  public String sendMsg(String msg) {

    jmsTemplate.convertAndSend(queue, msg);

    return "send success";
  }
}
QueueProducer

五、建立 Controller 接收消息

@Slf4j
@RestController
public class ProducerController {

  @Autowired
  private QueueProducer producer;

  @RequestMapping("/producer")
  public String produce(String msg) {
    log.info("spring boot produce msg={}", msg);
    return producer.sendMsg(msg);
  }
}
ProducerController

六、建立啓動類

@SpringBootApplication
public class ProducerApplication {

  public static void main(String[] args) {
    SpringApplication.run(ProducerApplication.class);
  }
}
ProducerApplication

Consumer端:

一、引入依賴

和 Producer 端同樣

二、新建 application.yaml 配置文件並進行基本配置

server:
  port: 9999

spring:
  activemq:
    broker-url: tcp://192.168.182.128:61616
application.yaml

三、建立 Consumer 類

@Slf4j
@Component
public class QueueConsumer {

  @JmsListener(destination = "springboot-queue")
  public void recive(String msg) {
    log.info("spring boot queue consumer receive msg={}", msg);
  }
}
QueueConsumer

四、建立啓動類

@EnableJms
@SpringBootApplication
public class ConsumerApplication {

  public static void main(String[] args) {
    SpringApplication.run(ConsumerApplication.class);
  }
}
ConsumerApplication

驗證:

分別把 Producer端和 Consumer端都啓動起來,而後在瀏覽器中發送 Get請求,Producer端接收請求並將消息發給 ActiveMQ服務端,而後 Consumer端接收到 ActiveMQ的消息。

Topic模式

topci模式的實現和queue模式基本同樣,只是有一處不太同樣, Producer端和 Consumer端的配置類都須要多配置一個 ContainerFactory,以下:

@Bean
public JmsListenerContainerFactory topicListenerContainerFactory(ConnectionFactory connectionFactory){
  DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
  factory.setConnectionFactory(connectionFactory);
  // topic類型消息必須設置爲true,false則表示是queue類型
  factory.setPubSubDomain(true);
  return factory;
}

同時在 @JmsListener註解中,須要加上上面這個方法,以下:

@Slf4j
@Component
public class TopicConsumer {

  @JmsListener(destination = "springboot-topic",containerFactory = "topicListenerContainerFactory")
  public void recive(String msg){
    log.info("spring boot topic consumer recive msg={} ",msg);
  }
}
相關文章
相關標籤/搜索