官方文檔:http://www.rabbitmq.com/tutorials/tutorial-one-python.htmlhtml
下文爲翻譯和erlang客戶端的例子python
RabbitMQ是一個消息中間件,它主要的思想很是簡單:接收和發送消息。你能夠把RabbitMQ想象成一個郵局:當你把信件放入郵筒的時候你很是肯定郵遞員會把你的信件投遞給信件的接收者。在這個比喻裏面,RabbitMQ是一個郵筒,郵局以及郵遞員。緩存
RabbitMQ與郵局的主要區別在於,RabbitMQ不處理信件,取而代之的是它接收,存儲而且發送二進制數據,也就是所謂的消息。服務器
RabbitMQ 與 messaging 一般狀況下,使用一些術語。網絡
生產也就是發送的意思,發送消息的程序叫作生產者,咱們把它畫成這樣,使用"p";ide
隊列是郵箱的名字,它存在於RabbitMQ的內部,雖然消息在RabbitMQ和你的應用程序之間流轉,可是消息只能被存儲在一個隊列裏面。隊列沒有任何限制,只要你願意,它能夠存儲任意多個消息,本質上,隊列是一個沒有限制的buffer。一個隊列能夠接收多個生產者的消息,多個消費者也能夠從一個隊列裏面接收消息。隊列通常用下圖表示,上面是隊列的名字:函數
消費其實就是接收的意思。一個消費者就是一個等待接收消息的應用程序。消費者通常用下圖表示:oop
值得注意的是,生產者,消費者和RabbitMQ不須要部署在同一臺物理機上,事實上在不少應用裏面他們也確實沒有部署在同一臺物理機上。ui
Hello Worldspa
咱們的"Hello world"程序不復雜 :發送一個消息,接收消息和打印到屏幕上。完成這些須要兩個程序:一個發送消息程序,和一個接收並打印消息程序。
總體的過程以下:
生產者發送消息到"hello"隊列。消費者從這個隊列接收消息。
Sending
咱們第一個程序 send.erl 將發送一個消息到隊列。
1.咱們須要創建一個到RabbitMQ server的鏈接。
{ok, Connection} =amqp_connection:start(#amqp_params_network{host = "localhost"}), {ok, Channel} = amqp_connection:open_channel(Connection),
咱們如今已經鏈接上了一個本地的RabbitMQ服務器,由於咱們給的參數是」localhost」。若是咱們想鏈接到不一樣機器上的RabbitMQ服務器,咱們能夠把」localhost」改爲相應機器的名字或者IP地址。
2:在發送消息以前,咱們必須確保接收消息的隊列是存在的。若是咱們把消息發送到一個不存在的隊列,RabbitMQ會把咱們的消息看成垃圾,不予處理。讓咱們來建立一個隊列,而且把它命名爲hello:
amqp_channel:call(Channel, #'queue.declare'{queue = <<"hello">>}),
如今咱們能夠準備發送一個消息。第一個消息僅僅包含一個"Hello world!"字符串。咱們想要發送到咱們的hello 隊列。
3:在RabbitMQ裏面,消息是不能被直接發送到隊列裏面去的,在消息發送到隊列以前必需要先通過一個「exchange」,如今咱們不討論exchange的具體細節,如今咱們只須要知道怎麼使用一個由空字符串標誌的默認exchange,這個exchange很是特殊---它容許咱們指定一個具體的消息要投遞的隊列。隊列的名字在參數routing_key裏面指定:
amqp_channel:cast(Channel, #'basic.publish'{ exchange = <<"">>, routing_key = <<"hello">>}, #amqp_msg{payload = <<"Hello World!">>}),
在退出程序以前,咱們須要確認刷新網絡緩存區,咱們的消息真正的發送到RabbitMQ。 咱們能夠關閉鏈接來達到這個目的。
ok = amqp_channel:close(Channel), ok = amqp_connection:close(Connection),
Receiving
咱們的第二個程序時receive1.erl,將會接收消息從隊列並在屏幕上打印出來。
構造消費者應用程序的步驟:
1:創建一個到RabbitMQ服務器的鏈接,與生產者的步驟同樣:
{ok, Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}), {ok, Channel} = amqp_connection:open_channel(Connection),
2:確保消息隊列的存在。咱們使用queue_declare來建立隊列。queue_declare能夠被調用屢次,可是隻有一個調用會建立queue:
amqp_channel:call(Channel, #'queue.declare'{queue = <<"hello">>})
你可能會問,爲何咱們要重複地建立隊列---咱們已經在生產者那邊建立了這個隊列。由於你不能保證生產者和消費者誰先啓動,因此咱們在兩邊都對隊列進行建立操做。
3:從隊列接收消息要相對來講複雜些。接收消息經過在隊列上綁定一個回調函數來實現,無論何時咱們接收到一個消息,這個回調函數都會被調用。在hello world這個例子中,回調函數的做用就是把收到的消息打印出來:
amqp_channel:subscribe(Channel, #'basic.consume'{queue = <<"hello">>, no_ack = true}, self()),
loop(Channel) -> receive {#'basic.deliver'{}, #amqp_msg{payload = Body}} -> io:format(" [x] Received ~p~n", [Body]), loop(Channel) end.
4:而後咱們告訴RabbitMQ收到消息時,給本身發送消息。
receive #'basic.consume_ok'{} -> ok end,
完整代碼
send.erl (發送消息)
-module(send). -compile([export_all]). -include_lib("amqp_client/include/amqp_client.hrl"). main(_) -> {ok, Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}), {ok, Channel} = amqp_connection:open_channel(Connection), amqp_channel:call(Channel, #'queue.declare'{queue = <<"hello">>}), amqp_channel:cast(Channel, #'basic.publish'{ exchange = <<"">>, routing_key = <<"hello">>}, #amqp_msg{payload = <<"Hello World!">>}), io:format(" [x] Sent 'Hello World!'~n"), ok = amqp_channel:close(Channel), ok = amqp_connection:close(Connection), ok.
receive1.erl(接收消息)
-module(receive1). -compile([export_all]). -include_lib("amqp_client/include/amqp_client.hrl"). main(_) -> {ok, Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}), {ok, Channel} = amqp_connection:open_channel(Connection), amqp_channel:call(Channel, #'queue.declare'{queue = <<"hello">>}), io:format(" [*] Waiting for messages. To exit press CTRL+C~n"), amqp_channel:subscribe(Channel, #'basic.consume'{queue = <<"hello">>, no_ack = true}, self()), receive #'basic.consume_ok'{} -> ok end, loop(Channel). loop(Channel) -> receive {#'basic.deliver'{}, #amqp_msg{payload = Body}} -> io:format(" [x] Received ~p~n", [Body]), loop(Channel) end.
erlang 客戶端: