rabbitmq 介紹

MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。其中較爲成熟的MQ產品有IBM WEBSPHERE MQ等等。java

MQ特色


MQ是消費-生產者模型的一個典型的表明,一端往 消息隊列中不斷寫入消息,而另外一端則能夠讀取或者訂閱隊列中的消息。MQ和 JMS相似,但不一樣的是JMS是SUN JAVA 消息中間件服務的一個標準和API定義,而MQ則是遵循了AMQP協議的具體實現和產品。

使用場景


在項目中,將一些無需即時返回且耗時的操做提取出來,進行了 異步處理,而這種異步處理的方式大大的節省了服務器的 請求響應時間,從而提升了系統的吞吐量。

 

含義

編輯
RabbitMQ是一個在 AMQP基礎上完成的,可複用的企業消息系統。他遵循Mozilla Public License 開源協議

安裝

編輯
4.1)安裝ERLANG
首先,由於RabbitMQ由ERLANG實現,下載ERLANG  源代碼
解壓源代碼至ERLANG,而後進入$ERLANG目錄下
安裝依賴包:
Yum install tk
Yum install tcl
Yum install unixODBC
進入$ERLANG.編譯ERLANG
./configure –prefix=/usr/local/erlang
./make
./make install
並將erlang bin目錄加至PATH
4.2)安裝RabbitMQ
下載RabbitMQ ,解壓至$RMQ。
啓動RabbitMQ
./bin/rabbitmq-server
 

客戶端

編輯
import com.rabbitmq.client.Channel;
  import com.rabbitmq.client.Connection;
  import com.rabbitmq.client.ConnectionFactory;
public class Send {
  private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws.IOException{
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  String message = "Hello World!";
  channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  System.out.println(" [x] Sent '" + message + "'");
  channel.close();
  connection.close();
  }
  }
 

消費者端

編輯
public class RabbitMQRecv {
  private final static String QUEUE_NAME = "hello";
public static void main(String avg[]) throws.IOException,
  java.lang.InterruptedException {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  QueueingConsumer consumer = new QueueingConsumer(channel);
  channel.basicConsume(QUEUE_NAME, true, consumer);
  while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  String message = new String(delivery.getBody());
  System.out.println(" [x] Received '" + message + "'");
  }
  }
  }
 

幾個概念

編輯
Exchange:交換機,決定了消息路由規則;
Queue:消息隊列;
Channel:進行消息讀寫的通道;
Bind:綁定了Queue和Exchange,意即爲符合什麼樣路由規則的消息,將會放置入哪個 消息隊列
 

消息持久

編輯
1) 將交換機置爲可持久;
2) 將通道置爲可持久
3) 消息發送時設置可持久。
當咱們「生產」了一條可持久化的消息,嘗試中斷MQ服務,啓動消費者獲取消息,消息依然可以恢復。相反,則拋出異常。
 

入門介紹


基本概念

RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標準實現。若是不熟悉AMQP,直接看RabbitMQ的文檔會比較困難。不過它也只有幾個關鍵概念,這裏簡單介紹。
RabbitMQ的結構圖以下:
幾個概念說明:
Broker:簡單來講就是消息隊列服務器實體。
  Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
  Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
  Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
  Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
  vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
  producer:消息生產者,就是投遞消息的程序。
  consumer:消息消費者,就是接受消息的程序。
  channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。
消息隊列的使用過程大概以下:
(1)客戶端鏈接到消息隊列服務器,打開一個channel。
  (2)客戶端聲明一個exchange,並設置相關屬性。
  (3)客戶端聲明一個queue,並設置相關屬性。
  (4)客戶端使用routing key,在exchange和queue之間創建好綁定關係。
  (5)客戶端投遞消息到exchange。
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。
exchange也有幾個類型,徹底根據key進行投遞的叫作Direct交換機,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。對key進行模式匹配後進行投遞的叫作Topic交換機,符號」#」匹配一個或多個詞,符號」*」匹配正好一個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.*」只匹配」abc.def」。還有一種不須要key的,叫作Fanout交換機,它採起廣播模式,一個消息進來時,投遞到與該交換機綁定的全部隊列。
RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,爲了數據安全考慮,我想大多數用戶都會選擇持久化。消息隊列持久化包括3個部分:
  (1)exchange持久化,在聲明時指定durable => 1
  (2)queue持久化,在聲明時指定durable => 1
  (3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)
若是exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。若是exchange和queue二者之間有一個持久化,一個非持久化,就不容許創建綁定。
 

應用實際

使用Linux服務器(ubuntu 9.10 64位),安裝RabbitMQ很是方便。
先運行以下命令安裝erlang:
apt-get install erlang-nox
下載RabbitMQ的安裝包,以下安裝:
dpkg -i rabbitmq-server_2.6.1-1_all.deb
安裝完後,使用
/etc/init.d/rabbitmq-server start|stop|restart
來啓動、中止、重啓rabbitmq。
在正式應用以前,先在RabbitMQ裏建立一個vhost,加一個用戶,並設置該用戶的權限。
使用rabbitmqctl客戶端工具,在根目錄下建立」/pyhtest」這個vhost:
rabbitmqctl add_vhost /pyhtest
建立一個用戶名」pyh」,設置密碼」pyh1234″:
rabbitmqctl add_user pyh pyh1234
設置pyh用戶對/pyhtest這個vhost擁有所有權限:
rabbitmqctl set_permissions -p /pyhtest pyh 「.*」 「.*」 「.*」
後面三個」*」表明pyh用戶擁有對/pyhtest的配置、寫、讀所有權限
設置好後,開始編程,用Perl寫一個消息投遞程序(producer):
#!/usr/bin/perl
  use strict;
  use Net::RabbitMQ;
  use UUID::Tiny;
my $channel = 1000; # channel ID,能夠隨意指定,只要不衝突
  my $queuename = 「pyh_queue」; # 隊列名
  my $exchange = 「pyh_exchange」; # 交換機名
  my $routing_key = 「test」; # routing key
my $mq = Net::RabbitMQ->new(); # 建立一個RabbitMQ對象
$mq->connect(「localhost」, { vhost => 「/pyhtest」, user => 「pyh」, password => 「pyh1234″ }); # 創建鏈接
  $mq->channel_open($channel); # 打開一個channel
  $mq->exchange_declare($channel, $exchange, {durable => 1}); # 聲明一個持久化的交換機
  $mq->queue_declare($channel, $queuename, {durable => 1}); # 聲明一個持久化的隊列
  $mq->queue_bind($channel, $queuename, $exchange, $routing_key); # 使用routing key在交換機和隊列間創建綁定
for (my $i=0;$i<10000000;$i++) { # 循環1000萬次
  my $string = create_UUID_as_string(UUID_V1); # 產生一條UUID做爲消息主體
  $mq->publish($channel, $routing_key, $string, { exchange => $exchange }, { delivery_mode => 2 }); # 將消息結合key以持久化模式投遞到交換機
  }
$mq->disconnect(); # 斷開鏈接
消息接受程序(consumer)大概以下:
#!/usr/bin/perl
  use strict;
  use Net::RabbitMQ;
my $channel = 1001;
  my $queuename = 「pyh_queue」;
  my $mq = Net::RabbitMQ->new();
$mq->connect(「localhost」, { vhost=>」/pyhtest」, user => 「pyh」, password => 「pyh1234″ });
  $mq->channel_open($channel);
while (1) {
  my $hashref = $mq->get($channel, $queuename);
  last unless defined $hashref;
  print $hashref->{message_count}, 「: 「, $hashref->{body},」\n」;
  }
$mq->disconnect();
consumer鏈接後只要指定隊列就可獲取到消息。
上述程序共投遞1000萬條消息,每條消息36字節(UUID),打開持久化,共耗時17分多鐘(包括產生UUID的時間),每秒投遞消息約9500條。測試機器是8G內存、8赫茲CPU。
投遞完後,在/var/lib/rabbitmq/mnesia/rabbit@${hostname}/msg_store_persistent目錄,產生2G多的持久化消息數據。在運行consumer程序後,這些數據都會消失,由於消息已經被消費了。
 

集羣配置

單機環境下的集羣配置:
首先啓動兩個實例,rab和rab2,端口分別爲9991和9992
RABBITMQ_NODE_PORT=9991 RABBITMQ_NODENAME=rab rabbitmq-server -detached
RABBITMQ_NODE_PORT=9992 RABBITMQ_NODENAME=rab2 rabbitmq-server -detached
二:中止rab2,並將其加入cluster集羣中
rabbitmqctl -n rab2 stop_app
rabbitmqctl -n rab2 join_cluster rab@rab(@rab,這裏的rab表示主機名)
從新啓動rab2
RABBITMQ_NODE_PORT=9992 RABBITMQ_NODENAME=rab2 rabbitmq-server -detached
查看集羣的狀態
rabbitmqctl cluster_status -n rab
顯示以下信息表示集羣配置正常:
Cluster status of node rab@rab ...   [{nodes,[{disc,[rab2@rab,rab@rab]}]},   {running_nodes,[rab@rab]},   {partitions,[]}]   ...done.
相關文章
相關標籤/搜索