在介紹RabbitMQ以前,咱們須要瞭解一些最基礎的概念,相信使用過或者據說過RabbitMQ的人都不會陌生,但筆者仍是不厭其煩地在這裏講述,由於筆者的理念是self contained。html
Queue
: 隊列。計算機數據結構中的一種基本類型,遵循「先入先出」(FIFO)的原則,好比咱們平常生活中常見的排隊時的隊伍就是一個隊列。Message Queue
: 消息隊列,簡稱MQ。消息隊列本質上也是隊列,只不過隊列中的元素爲Message(消息),而消息則是服務之間最多見的通訊方式。流行的MQ框架主要有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里開源的RocketMQ。AMQP
:Advanced Message Queuing Protocol,是一個提供統一消息服務的應用層標準高級消息隊列協議,簡單來講,它就是一個消息列隊的協議,其標準高,要求嚴。Erlang
:Erlang是一種通用的面向併發的編程語言,它由瑞典電信設備製造商愛立信所轄的CS-Lab開發,目的是創造一種能夠應對大規模併發活動的編程語言和運行環境。RabbitMQ
:RabbitMQ是一個實現了AMQP高級消息隊列協議的消息隊列服務,用Erlang語言實現。RabbitMQ的運行原理以下圖(後續咱們會解釋其中的含義,現階段只做爲瀏覽):以上是咱們對RabbitMQ的最初認識。接下來咱們還須要瞭解RabbitMQ的下載與安裝,以下:java
說了這麼多,咱們爲何要選擇RabbitMQ,也就是說它的優點
又是什麼呢?RabbitMQ的強大之處在於:python
帶着對RabbitMQ的初次見面,咱們不妨再瞭解下如何簡單地使用RabbitMQ。編程
在計算機領域中,每次學習一個新事物的驚喜,每每都是伴隨着Hello World
。在編程語言中,會有輸出「Hello World」
;在大數據中,「Hello World」就是統計單詞的詞頻
;在Docker中,就是使用「Hello World」鏡像
;在RabbitMQ,此次的「Hello World」就是生產者發送「Hello World」,而消費者輸出「Hello World」
。
RabbitMQ就是消息代理,它接受並推進消息流動。你能夠把它想象成一個郵局:當你把一封信塞進郵箱,你須要確保它能送到收信人的手裏。而RabbitMQ就是一個郵箱,郵局,郵遞員。不一樣於真實的郵局(處理信件),RabbitMQ處理接受、存儲、推進消息。
在RabbitMQ,或者消息隊列領域中,有以下術語。小程序
生產者(Producer)
:生產者僅產生消息,也就說一個產生消息的程序就是生產者。對應於郵局的例子,生產者就是寄信人,由於他們產生信件。隊列(Queue)
: 一個隊列就是RabbitMQ中的郵箱。儘管消息會在RabbitMQ和應用程序之間流動,可是它們只會在隊列中存儲。一個隊列僅受限於硬盤和內存大小,它是一個大的消息緩存區。許多生產者產生消息後會進入一個隊列,許多消費者也會從同一個隊列中獲取消息。如下是咱們如何表示一個隊列:消費者(Consumer)
:消費消息與接收消息的意思是一致的。一個消費者每每會等待接收消息。在郵局的例子中,消費者也許就是收信人。 介紹完生產者、隊列、消費者後,咱們將會來學習RabbitMQ中的Hello World。
咱們使用Python的Pika模塊來操做RabbitMQ。在本文中,咱們將會編寫兩個小程序:一個生產者(Producer)發送一條消息,而一個消費者(Consumer)將會接收這個消息並將它輸出。這就是消息通訊的「Hello World」。
在下圖中,P表明生產者,C表明消費者,中間的盒子表明隊列——消息緩存區。咱們總的設計圖以下:api
生產者會將消息發送至「hello」隊列,消費者從從該隊列中獲取消息。緩存
在這一部分中,咱們將會讓生產者來發送消息。網絡
咱們的第一個程序send.py
將會發送一個消息至隊列。首先咱們要作的是創建與RabbitMQ Server的鏈接。數據結構
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()
咱們鏈接到了本地機器(localhost
)的一個代理。若是咱們須要鏈接不一樣機器的代理,咱們只須要聲明機器名稱以及IP地址便可。
接着,在咱們發送消息以前,咱們須要確認隊列是否存在。若是咱們發送消息到一個不存在的地方,RabbitMQ將會丟失這條消息。所以,咱們須要建立一個hello
隊列,這裏將是消息傳遞的地方。併發
channel.queue_declare(queue='hello')
咱們已經準備好發送消息了。咱們的第一條消息是字符串「Hello World!」,咱們將它發送至hello
隊列。
在RabbitMQ中,消息不會被直接發送至隊列,它須要經過exchange
才能作到。在這裏咱們不須要了解exchange
的原理,咱們只須要知道,空字符串就表明默認的exchange
。該exchange
很特殊——它規定了咱們的消息往哪一個隊列走。隊列名稱須要用routing_key
這個參數來聲明:
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'")
在退出程序前,咱們須要確保網絡緩存被清空而且咱們的消息確實被傳送至RabbitMQ。通常咱們經過關閉鏈接來實現。
connection.close()
在這一部分中,咱們將會讓消費者來接受消息。
咱們的第二個程序receive.py
將會從隊列中接受消息,並把其輸出出來。
一樣地,第一步是鏈接到RabbitMQ Server。這部分的代碼與以前的部分相同。
下一步,更以前同樣,須要確保隊列存在。使用queue_declare
來建立隊列是冪等的(idempoten) —— 咱們能夠運行這條命令不少次,但只會建立一個隊列。
channel.queue_declare(queue='hello')
也許你會好奇咱們爲何要再一次聲明這個列隊,明明咱們在以前的代碼中已經聲明過了。這裏這麼作主要是爲了確保隊列已經存在。舉例來講,這邊是先運行send.py
,但咱們不能肯定哪個程序會先運行。所以在這樣的狀況下,在兩個程序中反覆聲明列隊是不錯的方式。
從隊列中接受消息更加複雜。他須要經過callback
函數與列隊關聯。不管何時咱們接受到消息,這個callback
函數都被會Pika模塊調用。在咱們的例子中,這個函數將會輸出消息的內容。
def callback(ch, method, properties, body): print(" [x] Received %r" % body)
下一步,咱們須要告訴RabbitMQ,在hello
隊列中,這個特定的callback
函數須要接受消息。
channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback)
auto_ack
參數的含義會在後面的文章中解釋。
最後咱們建立一個永不中止的循環,用於接收消息:
print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
上面的部分介紹了「Hello World」的理論方面,接下來,咱們會分別使用Python和Java程序來分別實現這個例子。
sent.py程序以下:
# -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World from Python!') print(" [x] Sent 'Hello World!'") connection.close()
receive.py程序以下:
# -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
先啓動receive.py,程序會提示「 [*] Waiting for messages. To exit press CTRL+C」,代表該消費者在等待接收消息。在運行sent.py,該程序會發送「Hello World from Python!」至隊列,同時receive.py會輸出該消息。每運行一次sent.py,receive.py會就會輸出一個該消息,以下圖:
咱們使用Gradle來構建這個項目,項目結構以下:
在build.gradle中,咱們引入第三方jar包,內容以下:
plugins { id 'java' } group 'rabbitmq' version '1.0-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { testCompile group: 'junit', name: 'junit', version: '4.12' // https://mvnrepository.com/artifact/com.rabbitmq/amqp-client compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.8.0' // https://mvnrepository.com/artifact/org.slf4j/slf4j-api compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.26' // https://mvnrepository.com/artifact/org.slf4j/slf4j-simple testCompile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.26' }
Send.java代碼以下:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World from Java!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
Recv.java的代碼以下:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
具體的操做方法同Python同樣。
若是咱們把Python的「Hello World」當作一個簡單的小系統,而Java的「Hello World」也當作一個簡單的小系統,那麼RabbitMQ能夠溝通這兩個系統,這也是RabbitMQ的一個特定:系統對接。
咱們在Python中運行receive.py
,而運行Java的Send.java
三次,運行Python的sent.py
兩次,結果以下:
這樣的測試結果是使人吃驚的,由於咱們用RabbitMQ打通了兩個不一樣語言的系統!
本文做爲RabbitMQ入門的第一篇,但願能對你們有所幫助。筆者也是初學RabbitM,文章中確定有不足之處,懇請你們批評指正。
感謝你們的閱讀~