程序員之消息隊列

1、什麼是消息隊列
MQ (Message Quene) : 翻譯爲 消息隊列 ,經過典型的 ⽣產者 和 消費者 模型,⽣產者不斷向消息隊列中⽣產消息,消費者不斷的從隊列中獲取消息。由於消息的⽣產和消費都是異步的,⽽且只關⼼消息的發送和接收,沒有業務邏輯的侵⼊,輕鬆的實現系統間解耦。別名爲 消息中間件 經過利⽤⾼效可靠的消息傳遞機制進⾏平臺⽆關的數據交流,並基於數據通訊來進⾏分佈式系統的集成。html

2、爲何要使用MQ
1.解耦
如今我有一個系統A,系統A能夠產生一個userId,而後,如今有系統B和系統C都須要這個userId去作相關的操做,能夠寫成以下操做web

若是有一天,系統B的負責人告訴系統A的負責人,如今系統B的SystemBNeed2do(String userId)這個接口再也不使用了,讓系統A別去調它了。那麼就須要從代碼的基礎上去修改了。這樣緊密的耦合關係會致使不少麻煩,若是使用消息中間件就不會出現以上問題。spring

2.異步
咱們再來看看下面這種狀況:系統A仍是直接調用系統B、C、D瀏覽器

假設系統A運算出userId具體的值須要50ms,調用系統B的接口須要300ms,調用系統C的接口須要300ms,調用系統D的接口須要300ms。那麼此次請求就須要50+300+300+300=950ms
而且咱們得知,系統A作的是主要的業務,而系統B、C、D是非主要的業務。好比系統A處理的是訂單下單,而系統B是訂單下單成功了,那發送一條短信告訴具體的用戶此訂單已成功,而系統C和系統D也是處理一些小事而已。
那麼此時,爲了提升用戶體驗和吞吐量,其實能夠異步地調用系統B、C、D的接口。因此,咱們能夠弄成是這樣的:緩存

系統A執行完了之後,將userId寫到消息隊列中,而後就直接返回了(至於其餘的操做,則異步處理)。
原本整個請求須要用950ms(同步)
如今將調用其餘系統接口異步化,只須要100ms(異步)springboot

三、削峯/限流
假設如今咱們每月要搞一次大促,大促期間的併發可能會很高的,好比每秒3000個請求。假設咱們如今有兩臺機器處理請求,而且每臺機器只能每次處理1000個請求。 架構

那多出來的1000個請求,可能就把咱們整個系統給搞崩了...因此,有一種辦法,咱們能夠寫到消息隊列中:併發

系統B和系統C根據本身的可以處理的請求數去消息隊列中拿數據,這樣即使有每秒有8000個請求,那只是把請求放在消息隊列中,去拿消息隊列的消息由系統本身去控制,這樣就不會把整個系統給搞崩。app

3、經常使用的消息中間件
當今市⾯上有不少主流的消息中間件,如⽼牌的 ActiveMQ 、 RabbitMQ ,炙⼿可熱的Kafka ,阿⾥巴巴⾃主開發 RocketMQ 等。
#1.ActiveMQ
ActiveMQ 是Apache出品,最流⾏的,能⼒強勁的開源消息總線。它是⼀個徹底⽀持JMS規範的的消息中間件。豐富的API,多種集羣架構模式讓ActiveMQ在業界成爲⽼牌的消息中間件,在中⼩型企業頗受歡迎!異步

2.Kafka

Kafka是LinkedIn開源的分佈式發佈-訂閱消息系統,⽬前歸屬於Apache頂級項⽬。Kafka主要特色是基於Pull的模式來處理消息消費,追求⾼吞吐量,⼀開始的⽬的就是⽤於⽇志收集和傳輸。0.8版本開始⽀持複製,不⽀持事務,對消息的重複、丟失、錯誤沒有嚴格要求,適合產⽣⼤量數據的互聯⽹服務的數據收集業務。

3.RocketMQ

RocketMQ是阿⾥開源的消息中間件,它是純Java開發,具備⾼吞吐量、⾼可⽤性、適合⼤規模分
布式系統應⽤的特色。RocketMQ思路起源於Kafka,但並非Kafka的⼀個Copy,它
對消息的可靠傳輸及事務性作了優化,⽬前在阿⾥集團被⼴泛應⽤於交易、充值、流計算、消
息推送、⽇志流式處理、binglog分發等場景。

4.RabbitMQ

RabbitMQ⽐Kafka可靠,Kafka更適合IO⾼吞吐的處理,⼀般應⽤在⼤數據⽇志處理或對實時性(少許延遲),可靠性(少許丟數據)要求稍低的場景使⽤,⽐如ELK⽇志收集。

4、RabbitMQ
基於 AMQP 協議,erlang語⾔開發,是部署最⼴泛的開源消息中間件,是最受歡迎的開源消息中間件之⼀。
官⽹ : https://www.rabbitmq.com/
官⽅教程 : https://www.rabbitmq.com/#getstarted

1.AMQP 協議
AMQP(advanced message queuingprotocol)`在2003年時被提出,最先⽤於解決⾦融領不一樣平臺之間的消息傳遞交互問題。顧名思義,AMQP是⼀種協議,更準確的說是⼀種binary wire-level protocol(連接協議)。這是其和JMS的本質差異,AMQP不從API層進⾏限定,⽽是直接定義⽹
絡交換的數據格式。這使得實現了AMQP的provider自然性就是跨平臺的。如下是AMQP協議模型:

2.RabbitMQ 的安裝
下載地址:https://www.rabbitmq.com/download.html

注意:處理要下載rabbitmq的按轉包外還要下載ellong環境

3.安裝步驟
先安裝安裝Erlang,再安裝rabbitmq.exe,直接下一步,傻瓜式安裝,安裝完成點擊開始會出現這裏會出現啓動、中止、從新安裝等

1.點擊

2.輸入命令:
rabbitmq-plugins enable rabbitmq_management

3.打開瀏覽器控制檯
http://localhost:15672/
默認帳號 guest guest
若是不能訪問,
文件夾爲隱藏 須要再文件夾選項中把隱藏文件夾打開顯示
C:\Users\XYH\AppData\Roaming\RabbitMQ\db 裏面的數據刪除 再次安裝一下Rabbitmq.exe
而後執行
rabbitmq-plugins enable rabbitmq_management
就能夠訪問到了

5、web管理界⾯介紹
1.簡介

connections:⽆論⽣產者仍是消費者,都須要與RabbitMQ建⽴鏈接後才能夠完成消息的⽣產和消費,在這⾥能夠查看鏈接狀況
channels:通道,建⽴鏈接後,會造成通道,消息的投遞獲取依賴通道。
Exchanges:交換機,⽤來實現消息的路由
Queues:隊列,即消息隊列,消息存放在隊列中,等待消費,消費後被移除隊列。
admin:用戶

2.Admin⽤戶和虛擬主機管理
a.添加用戶

上⾯的Tags選項,實際上是指定⽤戶的⻆⾊,可選的有如下⼏個:
==》超級管理員(administrator):可登錄管理控制檯,可查看全部的信息,而且能夠對⽤戶,策略(policy)進⾏操做。
==》監控者(monitoring):可登錄管理控制檯,同時能夠查看rabbitmq節點的相關信息(進程數,內存
使⽤狀況,磁盤使⽤狀況等)
==》策略制定者(policymaker):可登錄管理控制檯, 同時能夠對policy進⾏管理。但⽆法查看節點的相關信息(上圖紅框標識的部分)。
==》普通管理者(management):僅可登錄管理控制檯,⽆法看到節點信息,也⽆法對策略進⾏理。
==》其餘:⽆法登錄管理控制檯,一般就是普通的⽣產者和消費者。

b.建立虛擬主機
虛擬主機:爲了讓各個⽤戶能夠互不⼲擾的⼯做,RabbitMQ添加了虛擬主機(Virtual Hosts)的概念。其實就是⼀個獨⽴的訪問路徑,不一樣⽤戶使⽤不一樣路徑,各⾃有⾃⼰的隊列、交換機,互相不會影響。

c、綁定虛擬主機

6、RabbitMQ 的第⼀個程序
a.AMQP協議

b、RabbitMQ⽀持的消息模型

c.引入依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>

第⼀種模型(直連)

在上圖的模型中,有如下概念:
P:⽣產者,也就是要發送消息的程序
C:消費者:消息的接受者,會⼀直等待消息到來。
queue:消息隊列,圖中紅⾊部分。相似⼀個郵箱,能夠緩存消息;⽣產者向其中投遞消息,消費者從其中取出消息。

開發⽣產者
//建立鏈接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//設置端ip號
connectionFactory.setHost("127.0.0.1");
//設置端口號
connectionFactory.setPort(5672);
//設置用戶名和密碼
connectionFactory.setUsername("xyh");
connectionFactory.setPassword("123");
//訪問的虛擬通道
connectionFactory.setVirtualHost("/vhost");
//鏈接
Connection connection = connectionFactory.newConnection();
//建立通道
Channel channel = connection.createChannel();
//通道綁定對應的消息隊列
//參數1: 是否持久化 參數2:是否獨佔隊列 參
//數3:是否⾃動刪除 參數4:其餘屬性
channel.queueDeclare("hello",false,false,false,null);
//發佈消息
channel.basicPublish("","hello",null,"hello".getBytes());
channel.close();
connection.close();

消費者
//建立鏈接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//設置端ip號
connectionFactory.setHost("127.0.0.1");
//設置端口號
connectionFactory.setPort(5672);
//設置用戶名和密碼
connectionFactory.setUsername("xyh");
connectionFactory.setPassword("123");
//訪問的虛擬通道
connectionFactory.setVirtualHost("/vhost");
//鏈接
Connection connection = connectionFactory.newConnection();
//建立通道
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
//消費消息
channel.basicConsume("hello",true,new DefaultConsumer(channel){br/>@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});

7、springboot實現簡單的rabbitmqbr/>1.導入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置rabbitmq的配置信息
spring.application.name=rabbitmq-springboot
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=dt87
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=/dt87
3.消息生產者
@Autowired
private RabbitTemplate rabbitTemplate;br/>@Test
public void test(){
//直接獲取rabbitmq的模板對象
rabbitTemplate.convertAndSend("hello","hello world");

}br/>四、消息的消費者
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class Consumer {br/>//消費消息的方法@RabbitHandlerpublic void test(String message){System.out.println(message);}}}

相關文章
相關標籤/搜索