RabbitMQ能夠被比做一個郵局,當你向郵局寄一封信時,郵局會保證將這封信送達你寫的收件人,而RabbitMQ與郵局最主要的區別是,RabbitMQ並不真的處理信件,它處理的是二進制的數據塊,它除了接收「信件」,他還要負責處理數據(好比保存數據)、轉發數據(實際上送信是郵遞員的職責)等。docker
在RabbitMQ中有如下幾個概念:異步
一個生產者僅僅負責向RabbitMQ中發送消息,除此以外它不會完成其餘工做。函數
隊列由RabbitMQ自身提供,它的做用就像是郵箱和郵局的關係同樣。雖然看似RabbitMQ和你的應用直接通訊,可是實際上消息是被存儲到隊列中的,實際上,隊列就是一個消息緩衝區,受宿主環境內存和磁盤空間的限制。多個生產者能夠將消息發送到一個隊列,而多個消費者能夠從一個隊列中接收消息。測試
一個消費者僅僅負責接收來自生產者的消息。spa
這裏須要注意如下兩點:.net
咱們須要創建兩個.net core項目,分別是發佈端、接收端,它們由消息隊列進行鏈接,消息隊列會保存消息,直到消息被髮送給接收端,如圖所示。3d
建立項目後,兩個項目均需安裝RabbitMQ.Client的Nuget包,並restore。rest
在正式開始編寫發送端代碼以前,咱們須要安裝RabbitMQ,這裏我在Docker上安裝RabbitMQ,所用系統爲CentOS7。
首先創新新的文件夾目錄/docker/rabbitmq/data,用做RabbitMQ容器的數據卷掛載。code
而後運行下面的語句,若是本地沒有鏡像,Docker會自動去Dockerhub上拉取鏡像:blog
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq --volume /rabbitmq/data:/var/lib/rabbitmq rabbitmq:management
鏡像下載完畢後能夠看到容器已經運行,其中,15671用於外網訪問,5672用於內網鏈接。
訪問http://ip:15671,不出意外你能夠進入到RabbitMQ的可視化管理界面。
下面能夠開始編寫生產者端的代碼,以下所示。
using System; using System.Text; using RabbitMQ.Client; namespace Send { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "148.70.210.208" }; //鏈接RabbitMQ using (var connection = factory.CreateConnection()) { //建立一個信道 using (var channel = connection.CreateModel()) { //聲明一個Hello隊列,信道就與Hello隊列鏈接起來了 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World!"; //消息體 var body = Encoding.UTF8.GetBytes(message); //經過信道,將消息發送到隊列 //須要指明隊列的名稱,才能轉發到特定隊列 channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent [0]", message); } Console.WriteLine("Press [enter] to exit."); Console.ReadLine(); } } } }
程序首先經過Socket與RabbitMQ進行了鏈接,鏈接成功後,建立了一個信道,並經過這個信道聲明瞭一個名叫"hello"的隊列,這樣,隊列就和這個信道綁定起來,在發送數據時,只須要在routingKey中指明所用的隊列名,消息就會經過direct exchange的模式被髮送到指明名稱的隊列中,這就好像你在網上買了商品,而後你告訴賣家,必定要發某個快遞同樣,這樣,你的商品就會經過這個快遞公司郵寄給你。
direct exchange模式如圖所示。
整個流程能夠以下進行歸納,首先創建發佈端和RabbitMQ進行Socket鏈接,而且聲明Socket鏈接中的信道,經過這個信道聲明特定名稱的隊列,這樣,數據就能夠經過這個信道發送到RabbitMQ的隊列中,等待被轉發到消費端,這與官網給出的流程圖一模一樣。
下面咱們就能夠開始編寫消費端的程序,對於消費者端來講,它一直在進行監聽,咱們須要一直運行咱們的監聽程序,一旦收到消息,就把消息經過控制檯進行打印。
打開咱們的Recieve項目,代碼以下所示。
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Receive { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "your ip address" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"Received message:{message}"); }; channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer); Console.WriteLine("Press [enter] to exit"); Console.ReadLine(); } } } } }
一開始的代碼與發送端一致,創建鏈接、建立信道、聲明隊列,須要注意這裏聲明的隊列名須要與發送端綁定的隊列名一致。這裏之因此從新聲明隊列,是爲了保證當接收端程序比發送端先啓動時,隊列依然是存在的,可是須要保證名稱同樣。
以上準備工做完成後,接收端能夠開始異步的接收隊列中的數據了。咱們使用EventBasicConsumer.Received事件來綁定接收完成的回調函數。即:當消費端成功從"hello"隊列中接收到數據後,就會調用這個匿名的lambda表達式,從消息體中獲取數據,將數據轉碼成字符串格式而且打印到控制檯中,至此,一條消息的處理就完畢了。
須要注意,在消費端收到信息後,須要給客戶端確認,即返回autoAck,若是沒有返回,客戶端會默認消費者沒有收到,消息會存儲在隊列中,直到被消費。這裏設置autoAck:true,則表示消費者一旦收到消息,則馬上自動的向客戶端迴應,這樣,消息就會從隊列中刪除。
運行代碼能夠看到,不管先運行消費端仍是生產者端的代碼,消費端都能收到消息:
若是先運行生產者端程序,消息會存儲在消息隊列中,當運行消費者端程序後,保存在消息隊列中的消息將會被消費,進而從隊列中刪除。
例如,咱們先不運行消費者端程序,只運行生產者端,這是咱們刷新RabbitMQ的可視化界面。
能夠看到有一條消息處於Ready狀態,可是還未被消費,因此被存儲,咱們關閉生產者端程序,再次運行,能夠看到隊列中未被消費的消息變爲兩條。
這時,咱們運行消費者端程序,運行成功後能夠看到,消費者端一次收到了兩條消息,而且隊列中的消息被清除。
因而可知,對於RabbitMQ的消息隊列,生產者和消費者端是徹底分離的。 下一講咱們將繼續探索RabbitMQ的工做隊列。