1、消息隊列概述java
消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。web
消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。spring
消息隊列中間件是分佈式系統中重要的組件,主要解決異步消息,流量削峯,應用耦合等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分佈式系統不可缺乏的中間件。目前使用較多的消息隊列產品有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。數據庫
生活中的例子apache
老式餐廳點餐後需呆在座位上等餐,中途不能離開去幹別的事,若是離開去幹別的事,餐好了,點餐人殊不知道。瀏覽器
新式餐廳點餐後,餐廳會提供一個「電子盤」給顧客,顧客能夠不用在店裏等餐,能夠去附近逛逛,買買東西,等餐好了,手上的「電子盤」就會響,通知顧客能夠回去就餐了。springboot
對比以上兩種形式,第二種情形就像消息隊列同樣,點完餐之後就能夠去處理別的事情,不用一直在餐廳等着。網絡
2、消息隊列的做用session
上面說了消息隊列主要解決了異步處理,流量削峯,應用耦合等三個方面的問題。架構
異步處理
場景說明:用戶註冊後,系統要發送註冊郵件和註冊短信。傳統的方式有兩種,串行模式和並行方式 。
串行模式:將註冊信息存入數據庫成功後,先發送註冊郵件再發送註冊短信,以上三個步驟都完成後,將成功的信息返回給客戶端。
並行模式:將註冊信息存入數據庫成功後,發送郵件的同時發送註冊短信,以上三個任務都完成後返回給客戶端,與串行模式的差異是並行模式能夠提升處理的時間。
假設每一個業務結點的處理時間爲50ms,不考慮網絡開銷,則串行模式的時間爲150ms,並行模式的時間爲100ms。
若是引入消息隊列,可以大大縮短響應時間,以下:
用戶註冊信息寫入數據庫後,再將發送郵件和短信寫入消息隊列,而後直接返回註冊結果,總共耗時55m,是並行的一半左右,是串行的三分之一左右,大大提升了系統的處理能力。
應用解耦
場景說明:用戶下單後,訂單系統須要通知庫存系統。傳統的作法是,訂單系統調用庫存系統的接口,如圖所示:
傳統模式的缺點:
引入消息隊列的方案以下:
流量削峯
場景說明:業務系統處理能力遠遠大於支付渠道處理能力,假如不控制流量把所有請求往支付渠道發送,支付渠道可能會掛掉,致使整個業務不能成功。
這時引入消息隊列,控制流量,讓請求有序的進入支付渠道
日誌處理
日誌處理是指將消息隊列用在日誌處理中,好比 Kafka 的應用,解決大量日誌傳輸的問題。架構簡化以下:
3、Active MQ
下載
http://activemq.apache.org/components/classic/download/
安裝
直接解壓,而後移動到指定目錄便可。
>tar zxvf apache-activemq-5.15.10-bin.tar.gz >mv ./apache-activemq-5.15.10 /usr/local
啓動
>/usr/local/activemq-5.15.10/bin/activemq start # 檢查啓動狀態 [root@cbooy bin]# jps 3168 Jps 2268 activemq.jar # activemq啓動的默認端口號 61616 [root@cbooy bin]# lsof -i:61616 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 2268 root 132u IPv6 15719 0t0 TCP *:61616 (LISTEN)
其餘基本命令
> activemq restart # 重啓 > activemq stop # 關閉 > activemq start > /activemq_home/logs/activemq.log # 落地相關信息,打印日誌
指定配置文件的啓動
./bin/activemq start xbean:/usr/local/activemq-5.15.10/conf/activemq.xml
後臺圖形化界面支持
4、Java操做ActiveMQ
依賴 jar 包
dependencies { compile('org.apache.activemq:activemq-all:5.15.9') compile('org.apache.activemq:activemq-pool:5.15.9') }
第一種模式:Queue
生產流程
public class Producer { // activemq服務的地址,默認通訊端口爲61616 private static final String URL = "tcp://192.168.182.128:61616"; // 定義隊列的名稱 private static final String QUEUE_NAME = "test-Queue"; public static void main(String[] args) { MessageProducer producer = null; Session session = null; Connection connection = null; try { // 建立鏈接工廠對象 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 從工廠中創建一個鏈接並開啓(Connection) connection = connectionFactory.createConnection(); connection.start(); // 從鏈接中創建一個會話(Session) session = connection.createSession(false, 1); // 基於會話創建隊列(Queue) Queue queue = session.createQueue(QUEUE_NAME); // 基於會話建立生產者(Producer) producer = session.createProducer(queue); for (int i = 0; i < 10; i++) { // 在會話的基礎上建立一條消息(Message) TextMessage textMessage = session.createTextMessage("test-mq:" + i); // 生產者將消息發出 producer.send(textMessage); } } catch (Exception ex) { throw new IllegalStateException(ex); // 資源關閉 } finally { try { if (null != producer) { producer.close(); } } catch (JMSException e) { e.printStackTrace(); } try { if (null != session) { session.close(); } } catch (JMSException e) { e.printStackTrace(); } try { if (null != connection) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } }
執行以上代碼後,咱們能夠在管理頁面上看到以下狀況:
消費流程
public class Consumer { // activemq服務地址,默認通訊端口爲61616 private static final String URL = "tcp://192.168.182.128:61616"; // 定義隊列的名稱 private static final String QUEUE_NAME = "test-Queue"; public static void main(String[] args) { MessageConsumer consumer = null; Session session = null; Connection connection = null; try { // 建立鏈接工廠對象 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 從工廠中創建一個鏈接並開啓(Connection) connection = connectionFactory.createConnection(); connection.start(); // 從鏈接中創建一個會話(Session) session = connection.createSession(false, 1); // 基於會話創建隊列(Queue) Queue queue = session.createQueue(QUEUE_NAME); // 基於會話建立消費者(Consumer) consumer = session.createConsumer(queue); // 接收消息的第一種方式,阻塞式接收 // Message message = consumer.receive(); // System.out.println("consumer recive message = " + message); // 接收消息的第二種方式,使用監聽器 consumer.setMessageListener(msg -> { TextMessage textMessage = (TextMessage) msg; try { System.out.println("textMessage = " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }); } catch (Exception ex) { throw new IllegalStateException(ex); } } }
執行以上代碼後,咱們能夠在管理頁面上看到以下狀況:
咱們此次先運行兩個 Consumer,因爲 Consumer 種沒有關閉資源,因此會一直保持和 ActiveMQ的鏈接。
而後再運行 Producer,咱們來看看現象:
控制檯打印的信息中,Consumer1 消費的信息都是偶數的,Consumer2 消費的信息都是奇數的,一條消息只能被一個Consumer消費。
第二種模式:Topic
生產流程
public class Producer { // activemq服務地址,默認通訊端口爲61616 private static final String URL = "tcp://192.168.182.128:61616"; // 定義隊列的名稱 private static final String TOPIC_NAME = "test-Topic"; public static void main(String[] args) { MessageProducer producer = null; Session session = null; Connection connection = null; try { // 建立鏈接工廠對象 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 從工廠中創建一個鏈接並開啓(Connection) connection = connectionFactory.createConnection(); connection.start(); // 從鏈接中創建一個會話(Session) session = connection.createSession(false, 1); // 基於會話創建目的地(Topic) Topic topic = session.createTopic(TOPIC_NAME); // 基於會話建立生產者(Producer) producer = session.createProducer(topic); for (int i = 0; i < 10; i++) { // 在會話的基礎上建立一條消息(Message) TextMessage textMessage = session.createTextMessage("test-topic:" + i); // 生產者將消息發出 producer.send(textMessage); } } catch (Exception ex) { throw new IllegalStateException(ex); // 資源關閉 } finally { try { if (null != producer) { producer.close(); } } catch (JMSException e) { e.printStackTrace(); } try { if (null != session) { session.close(); } } catch (JMSException e) { e.printStackTrace(); } try { if (null != connection) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } }
消費流程
public class Consumer1 { // activemq服務的地址,默認通訊端口爲61616 private static final String URL = "tcp://192.168.182.128:61616"; // 定義隊列的名稱 private static final String TOPIC_NAME = "test-Topic"; public static void main(String[] args) { MessageConsumer consumer = null; Session session = null; Connection connection = null; try { // 建立鏈接工廠對象 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 從工廠中創建一個鏈接並開啓(Connection) connection = connectionFactory.createConnection(); connection.start(); // 從鏈接中創建一個會話(Session) session = connection.createSession(false, 1); // 基於會話創建目的地(Topic) Topic topic = session.createTopic(TOPIC_NAME); // 基於會話建立消費者(Consumer) consumer = session.createConsumer(topic); // 接收消息的第一種方式,阻塞式接收 // Message message = consumer.receive(); // System.out.println("consumer recive message = " + message); // 接收消息的第二種方式,使用監聽器 consumer.setMessageListener(msg -> { TextMessage textMessage = (TextMessage) msg; try { System.out.println("textMessage = " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }); } catch (Exception ex) { throw new IllegalStateException(ex); } } }
Queue 模式和 Topic 模式,代碼十分類似,一個是建立Queue,而另一個是建立Topic。
如今咱們來運行三個 Consumer,再運行 Producer,來看看現象
控制檯打印的信息中,三個Consumer 都消費了全部消息,一條消息只能被多個 Consumer消費。
5、SpringBoot整合ActiveMQ
Queue模式
Producer端:
一、引入依賴
dependencies { implementation('org.springframework.boot:spring-boot-starter-web') implementation('org.springframework.boot:spring-boot-starter-aop') testImplementation('org.springframework.boot:spring-boot-starter-test') // 導入activemq啓動器依賴 implementation('org.springframework.boot:spring-boot-starter-activemq') }
二、新建 application.yaml 配置文件並進行基本配置
server: port: 8888 servlet: context-path: /queue-producer spring: activemq: broker-url: tcp://192.168.182.128:61616
三、建立配置類
@EnableJms @Configuration public class ProducerConfig { @Bean public Queue createQueue(){ return new ActiveMQQueue("springboot-queue"); } }
四、建立 Producer 類
@Component public class QueueProducer { @Autowired private Queue queue; @Autowired private JmsMessagingTemplate jmsTemplate; public String sendMsg(String msg) { jmsTemplate.convertAndSend(queue, msg); return "send success"; } }
五、建立 Controller 接收消息
@Slf4j @RestController public class ProducerController { @Autowired private QueueProducer producer; @RequestMapping("/producer") public String produce(String msg) { log.info("spring boot produce msg={}", msg); return producer.sendMsg(msg); } }
六、建立啓動類
@SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class); } }
Consumer端:
一、引入依賴
和 Producer 端同樣
二、新建 application.yaml 配置文件並進行基本配置
server: port: 9999 spring: activemq: broker-url: tcp://192.168.182.128:61616
三、建立 Consumer 類
@Slf4j @Component public class QueueConsumer { @JmsListener(destination = "springboot-queue") public void recive(String msg) { log.info("spring boot queue consumer receive msg={}", msg); } }
四、建立啓動類
@EnableJms @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class); } }
驗證:
分別把 Producer端和 Consumer端都啓動起來,而後在瀏覽器中發送 Get請求,Producer端接收請求並將消息發給 ActiveMQ服務端,而後 Consumer端接收到 ActiveMQ的消息。
Topic模式
topci模式的實現和queue模式基本同樣,只是有一處不太同樣, Producer端和 Consumer端的配置類都須要多配置一個 ContainerFactory,以下:
@Bean public JmsListenerContainerFactory topicListenerContainerFactory(ConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // topic類型消息必須設置爲true,false則表示是queue類型 factory.setPubSubDomain(true); return factory; }
同時在 @JmsListener註解中,須要加上上面這個方法,以下:
@Slf4j @Component public class TopicConsumer { @JmsListener(destination = "springboot-topic",containerFactory = "topicListenerContainerFactory") public void recive(String msg){ log.info("spring boot topic consumer recive msg={} ",msg); } }