瞭解RabbitMQ

簡介:RabbitMQ是一套開源(MPL)的消息隊列服務軟件,是由 LShift 提供的一個 Advanced Message Queuing Protocol (AMQP) 的開源實現,由以高性能、健壯以及可伸縮性出名的 Erlang 寫成。數據庫

RabbitMQ的安裝與配置百度都有,我就不介紹了,畢竟這裏寫起來也是比較麻煩的。數據結構

何時須要用到RabbitMQ呢?我從一個簡單的例子來引入。函數

假設有這麼一個功能,咱們要對接收到的壓縮包進行解壓並處理,那麼咱們第一步想到的是否相似於這樣:性能

public static void main(){
       
       public void Do(){
             while(true){
                  String guid = Receive();
                  Deal(guid);
             }
       }       

       public String Receive(){
      //接收文件並返回文件標識
             return guid;
       }

       public void Deal(String guid){
             //解壓縮並處理
                 
       }  
}

這樣咱們就能把基本的功能實現了,先接收文件而後處理文件,咋一看好像沒什麼問題,可是仔細想一想,這個程序是否是有點浪費,由於接收和處理兩個方法每次執行只有一個方法在運行。那咱們繼續改一下:ui

public static void main(){
       
       public void Do(){
             while(true){
                  String guid = Receive();
             }
             while(true){
                  Deal(guid);
             }
       }       

       public String Receive(){
      //接收文件並返回文件標識
             return guid;
       }

       public void Deal(String guid){
             //解壓縮並處理
                 
       }  
}

這樣問題又來了,怎麼把guid傳給deal呢,咱們繼續改造,加入一種數據結構:spa

public static void main(){
       
       ArrayList list = new ArrayList();

       public void Do(){
             while(true){
                  String guid = Receive();
                  list.add(guid);
             }
             while(true){
                  if(list.size()>0){
                       String guid = list.get(0);
                       Deal(guid);
                       list.remove(0);
                  }
             }
       }       

       public String Receive(){
      //接收文件並返回文件標識
             return guid;
       }

       public void Deal(String guid){
             //解壓縮並處理
                 
       }  
}

咱們實際操做中會有這樣的狀況,可能接收須要1秒鐘,處理須要2秒鐘,咱們用兩個線程分別執行,那頗有可能有兩個線程同時去判斷list.size()>0,而後處理同一個對象,這是咱們不肯意看到的。線程

想一想大學時候學的隊列,是否是很符合要求,先進先出,因此咱們稍微改下,把ArrayList換成QueueList:code

public static void main(){
       
       QueueList list = new QueueList();

       public void Do(){
             while(true){
                  String guid = Receive();
                  list.enqueue(guid);
             }
             while(true){
                  String guid = list.dequeue();
                  Deal(guid);     
             }
       }       

       public String Receive(){
      //接收文件並返回文件標識
             return guid;
       }

       public void Deal(String guid){
             //解壓縮並處理
                 
       }  
}

那可能隨着數據量愈來愈龐大,一個進程沒法知足,咱們就要建立多個進程,甚至多臺機器。那麼問題又來了,隊列如何共享呢?對象

既然須要全局的隊列,那麼咱們是否須要知足下面幾點:blog

① 多個程序像鏈接數據庫一個能夠訪問同一個隊列

② 程序既能夠enqueue又能夠dequeue

③ 若是有新的數據入庫,能夠反過來通知程序去接收處理,而不是咱們程序主動掃描

④ 程序有容錯功能,宕機、停電或重啓等操做,能讓數據保留下來(持久化)

⑤ 一個數據,不能被兩個程序同時訪問,得有鎖定功能

⑥ 既然是一個獨立軟件,就不能只管理一個隊列,應該能夠管理多個

想一想咱們本身來實現,是否是挺恐怖的!

這時候消息隊列的概念就被引入了,咱們已知的有微軟的MessageQueue,開源的RabbitMQ,還有Apache的ActiveMQ

固然,若是須要在代碼中引入的話,第一步確定是引入jar包。咱們來寫一個簡單的加入隊列和取出隊列功能

public static void main() throws  IOException,TimeoutException{
      ConnectionFactory connect = new ConnectionFactory();
      //rabbitMQ IP
      connect.setHost("192.168.215.331");
      //端口號
      connect.setPort(5672);
      //用戶名
      connect.setUsername("lsd");
      //密碼
      connect.setPassword("123456");
      String queueName = "TESTMQ";
      
      Connection connection = connect.newConnection();
      Channel channel = connection.createChannel();
      channel.queueDeclare(queueName,true,false,false,null);
      String msg = "Hello";
      channel.basicPublish("",queueName,null,msg.getBytes("UTF-8"));
      channel.close();
      connection.close();
}

咱們在上面的程序裏面新建了一個叫作TESTMQ的隊列,而且往隊列裏面放入了一個字符串Hello

public static void main() throws  IOException,TimeoutException{
      ConnectionFactory connect = new ConnectionFactory();
      //rabbitMQ IP
      connect.setHost("192.168.215.331");
      //端口號
      connect.setPort(5672);
      //用戶名
      connect.setUsername("lsd");
      //密碼
      connect.setPassword("123456");
      String queueName = "TESTMQ";
      Connection connection = connect.newConnection();
      Channel channel = connection.createChannel();
      channel.queueDeclare(queueName,true,false,false,null);

      channel.basicQos();
      QueueingConsumer consumer = new Queueingconsumer(channel);
      channel.basicConsume(queuqName,false,consumer);
    while(true){
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      byte[] byte = delivery.getBody();
      System.out.println(new String(byte));
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
}

 咱們在上面的程序裏面從一個叫作TESTMQ的隊列裏面不停的取值(由於是死循環),很容易理解吧,第一個main函數能夠不斷執行,第二個main函數都能及時取到值,先講到這兒,入門應該是夠了

相關文章
相關標籤/搜索